Merge branch 'master' into 3.0.0

pull/1303/head
Nikita 7 years ago
commit abace2c804

@ -23,8 +23,10 @@ import org.redisson.api.RPatternTopic;
import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener; import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.AsyncSemaphore;
@ -68,10 +70,18 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
return System.identityHashCode(pubSubListener); return System.identityHashCode(pubSubListener);
} }
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override @Override
public void removeListener(int listenerId) { public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
@ -90,7 +100,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override @Override
public void removeAllListeners() { public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
@ -109,7 +119,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override @Override
public void removeListener(PatternMessageListener<M> listener) { public void removeListener(PatternMessageListener<M> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {

@ -24,9 +24,11 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener; import org.redisson.api.listener.StatusListener;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonObjectFactory;
@ -126,7 +128,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override @Override
public void removeAllListeners() { public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
@ -141,11 +143,19 @@ public class RedissonTopic<M> implements RTopic<M> {
semaphore.release(); semaphore.release();
} }
} }
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override @Override
public void removeListener(MessageListener<?> listener) { public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
@ -165,7 +175,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override @Override
public void removeListener(int listenerId) { public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {

@ -93,7 +93,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data == null) { if (data == null) {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel()); decode(in, null, null, ctx.channel());
} }
} else if (data instanceof CommandData) { } else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
@ -105,6 +105,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
} catch (Exception e) { } catch (Exception e) {
cmd.tryFailure(e); cmd.tryFailure(e);
throw e;
} }
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data; CommandsData commands = (CommandsData)data;
@ -112,6 +113,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decodeCommandBatch(ctx, in, data, commands); decodeCommandBatch(ctx, in, data, commands);
} catch (Exception e) { } catch (Exception e) {
commands.getPromise().tryFailure(e); commands.getPromise().tryFailure(e);
throw e;
} }
return; return;
} }
@ -172,7 +174,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandsData commandBatch) { CommandsData commandBatch) throws Exception {
int i = state().getBatchIndex(); int i = state().getBatchIndex();
Throwable error = null; Throwable error = null;
@ -211,6 +213,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
} catch (Exception e) { } catch (Exception e) {
commandData.tryFailure(e); commandData.tryFailure(e);
throw e;
} }
i++; i++;
if (commandData != null && !commandData.isSuccess()) { if (commandData != null && !commandData.isSuccess()) {

@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisRedirectException; import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
@ -35,7 +36,7 @@ public class BatchCommandData<T, R> extends CommandData<T, R> implements Compara
private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>(); private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>();
public BatchCommandData(RedisCommand<T> command, Object[] params, int index) { public BatchCommandData(RedisCommand<T> command, Object[] params, int index) {
this(new RedissonPromise<R>(), null, command, params, index); this(new RedissonPromise<R>(), StringCodec.INSTANCE, command, params, index);
} }
public BatchCommandData(RPromise<R> promise, Codec codec, RedisCommand<T> command, Object[] params, int index) { public BatchCommandData(RPromise<R> promise, Codec codec, RedisCommand<T> command, Object[] params, int index) {

@ -21,6 +21,12 @@ import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <R> result type
*/
public interface Decoder<R> { public interface Decoder<R> {
R decode(ByteBuf buf, State state) throws IOException; R decode(ByteBuf buf, State state) throws IOException;

@ -324,6 +324,8 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder())); new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder()));
RedisCommand<List<Map<String, String>>> SENTINEL_SENTINELS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SENTINELS",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder()));
RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS"); RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS");
RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE"); RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE");

@ -74,8 +74,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<URI, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;
@ -97,7 +95,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Throwable lastException = null; Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>(); List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) { for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
try { try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
@ -186,43 +184,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result; return result;
} }
private void close(RedisConnection conn) {
if (nodeConnections.values().remove(conn)) {
conn.closeAsync();
}
}
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return RedissonPromise.newSucceededFuture(connection);
}
RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RedisConnection connection = future.getNow();
if (connection.isActive()) {
nodeConnections.put(addr, connection);
result.trySuccess(connection);
} else {
connection.closeAsync();
result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
}
}
});
return result;
}
private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " + RedisException e = new RedisException("Failed to add master: " +
@ -237,7 +198,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>(); final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>();
RFuture<RedisConnection> connectionFuture = connect(cfg, partition.getMasterAddress()); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null);
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -390,7 +351,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return; return;
} }
final URI uri = iterator.next(); final URI uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connect(cfg, uri); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null);
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -414,7 +375,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception { public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause()); log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
close(connection); closeNodeConnection(connection);
getShutdownLatch().release(); getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, iterator); scheduleClusterChangeCheck(cfg, iterator);
return; return;
@ -782,15 +743,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void shutdown() { public void shutdown() {
monitorFuture.cancel(true); monitorFuture.cancel(true);
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); closeNodeConnections();
for (RedisConnection connection : nodeConnections.values()) {
RFuture<Void> future = connection.getRedisClient().shutdownAsync();
futures.add(future);
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
super.shutdown(); super.shutdown();
} }

@ -35,6 +35,11 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
* Database index used for Redis connection * Database index used for Redis connection
*/ */
private int database = 0; private int database = 0;
/**
* Sentinel scan interval in milliseconds
*/
private int scanInterval = 1000;
public SentinelServersConfig() { public SentinelServersConfig() {
} }
@ -44,6 +49,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
setSentinelAddresses(config.getSentinelAddresses()); setSentinelAddresses(config.getSentinelAddresses());
setMasterName(config.getMasterName()); setMasterName(config.getMasterName());
setDatabase(config.getDatabase()); setDatabase(config.getDatabase());
setScanInterval(config.getScanInterval());
} }
/** /**
@ -94,4 +100,18 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
return database; return database;
} }
public int getScanInterval() {
return scanInterval;
}
/**
* Sentinel scan interval in milliseconds
*
* @param scanInterval in milliseconds
* @return config
*/
public SentinelServersConfig setScanInterval(int scanInterval) {
this.scanInterval = scanInterval;
return this;
}
} }

@ -33,12 +33,20 @@ public class CountableListener<T> implements FutureListener<Object> {
protected final RPromise<T> result; protected final RPromise<T> result;
protected final T value; protected final T value;
public CountableListener() {
this(null, null);
}
public CountableListener(RPromise<T> result, T value) { public CountableListener(RPromise<T> result, T value) {
super(); super();
this.result = result; this.result = result;
this.value = value; this.value = value;
} }
public void setCounter(int newValue) {
counter.set(newValue);
}
public void incCounter() { public void incCounter() {
counter.incrementAndGet(); counter.incrementAndGet();
} }
@ -46,13 +54,21 @@ public class CountableListener<T> implements FutureListener<Object> {
@Override @Override
public void operationComplete(Future<Object> future) throws Exception { public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.tryFailure(future.cause()); if (result != null) {
result.tryFailure(future.cause());
}
return; return;
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
result.trySuccess(value); onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
} }
} }
protected void onSuccess(T value) {
}
} }

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -39,6 +40,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
@ -54,7 +56,6 @@ import org.redisson.config.TransportMode;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -163,6 +164,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final DnsAddressResolverGroup resolverGroup; protected final DnsAddressResolverGroup resolverGroup;
private final Map<Object, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
{ {
for (int i = 0; i < locks.length; i++) { for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1); locks[i] = new AsyncSemaphore(1);
@ -224,6 +227,63 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.commandExecutor = new CommandSyncService(this); this.commandExecutor = new CommandSyncService(this);
} }
protected void closeNodeConnections() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisConnection connection : nodeConnections.values()) {
RFuture<Void> future = connection.getRedisClient().shutdownAsync();
futures.add(future);
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
protected void closeNodeConnection(RedisConnection conn) {
if (nodeConnections.values().remove(conn)) {
conn.closeAsync();
}
}
protected RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, final URI addr, RedisClient client) {
final Object key;
if (client != null) {
key = client;
} else {
key = addr;
}
RedisConnection connection = nodeConnections.get(key);
if (connection != null) {
return RedissonPromise.newSucceededFuture(connection);
}
if (addr != null) {
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
}
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RedisConnection connection = future.getNow();
if (connection.isActive()) {
nodeConnections.put(key, connection);
result.trySuccess(connection);
} else {
connection.closeAsync();
result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
}
}
});
return result;
}
public boolean isClusterMode() { public boolean isClusterMode() {
return false; return false;
} }
@ -432,33 +492,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners);
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() {
@Override
public void run() {
RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
} }
@Override
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>(); RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise; return promise;
} }
public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) { @Override
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners);
}
private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>(); final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() { lock.acquire(new Runnable() {
@Override @Override
public void run() { public void run() {
RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, lock, listeners); if (result.isDone()) {
future.addListener(new TransferListener<PubSubConnectionEntry>(result)); lock.release();
return;
}
subscribe(codec, channelName, result, type, lock, listeners);
} }
}); });
return result; return result;
@ -487,6 +549,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void run() { public void run() {
if (promise.isDone()) { if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return; return;
} }
@ -536,8 +600,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
lock.release(); if (!promise.trySuccess(connEntry)) {
promise.trySuccess(connEntry); for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(channelName, lock);
} else {
lock.release();
}
} else {
lock.release();
}
} }
}); });
} }

@ -268,9 +268,12 @@ public class MasterSlaveEntry {
@Override @Override
public void operationComplete(Future<PubSubConnectionEntry> future) public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception { throws Exception {
if (future.isSuccess()) { if (!future.isSuccess()) {
log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient()); subscribe(channelName, listeners, subscribeCodec);
return;
} }
log.debug("resubscribed listeners of '{}' channel to '{}'", channelName, future.getNow().getConnection().getRedisClient());
} }
}); });
} }
@ -292,7 +295,7 @@ public class MasterSlaveEntry {
private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners, private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) { final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null); RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() { subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override @Override
public void operationComplete(Future<PubSubConnectionEntry> future) public void operationComplete(Future<PubSubConnectionEntry> future)
@ -302,11 +305,7 @@ public class MasterSlaveEntry {
return; return;
} }
PubSubConnectionEntry newEntry = future.getNow(); log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
} }
}); });
} }

@ -16,17 +16,12 @@
package org.redisson.connection; package org.redisson.connection;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
@ -35,8 +30,6 @@ import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReplicatedServersConfig; import org.redisson.config.ReplicatedServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -60,8 +53,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private AtomicReference<URI> currentMaster = new AtomicReference<URI>(); private AtomicReference<URI> currentMaster = new AtomicReference<URI>();
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;
private enum Role { private enum Role {
@ -76,7 +67,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
initTimer(this.config); initTimer(this.config);
for (URI addr : cfg.getNodeAddresses()) { for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
connectionFuture.awaitUninterruptibly(); connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow(); RedisConnection connection = connectionFuture.getNow();
if (connection == null) { if (connection == null) {
@ -115,37 +106,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
return res; return res;
} }
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URI addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return RedissonPromise.newSucceededFuture(connection);
}
RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RedisConnection connection = future.getNow();
if (connection.isActive()) {
nodeConnections.put(addr, connection);
result.trySuccess(connection);
} else {
connection.closeAsync();
result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
}
}
});
return result;
}
private void scheduleMasterChangeCheck(final ReplicatedServersConfig cfg) { private void scheduleMasterChangeCheck(final ReplicatedServersConfig cfg) {
monitorFuture = group.schedule(new Runnable() { monitorFuture = group.schedule(new Runnable() {
@Override @Override
@ -159,7 +119,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
return; return;
} }
RFuture<RedisConnection> connectionFuture = connect(cfg, addr); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -175,7 +135,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
return; return;
} }
RedisConnection connection = future.getNow(); final RedisConnection connection = future.getNow();
RFuture<Map<String, String>> result = connection.async(RedisCommands.INFO_REPLICATION); RFuture<Map<String, String>> result = connection.async(RedisCommands.INFO_REPLICATION);
result.addListener(new FutureListener<Map<String, String>>() { result.addListener(new FutureListener<Map<String, String>>() {
@Override @Override
@ -183,6 +143,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
throws Exception { throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error(future.cause().getMessage(), future.cause()); log.error(future.cause().getMessage(), future.cause());
closeNodeConnection(connection);
if (count.decrementAndGet() == 0) { if (count.decrementAndGet() == 0) {
scheduleMasterChangeCheck(cfg); scheduleMasterChangeCheck(cfg);
} }
@ -215,15 +176,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
public void shutdown() { public void shutdown() {
monitorFuture.cancel(true); monitorFuture.cancel(true);
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); closeNodeConnections();
for (RedisConnection connection : nodeConnections.values()) {
RFuture<Void> future = connection.getRedisClient().shutdownAsync();
futures.add(future);
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
super.shutdown(); super.shutdown();
} }
} }

@ -17,11 +17,16 @@ package org.redisson.connection;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
@ -40,6 +45,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.SentinelServersConfig; import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -47,6 +53,7 @@ import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/** /**
@ -60,9 +67,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>(); private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap(); private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
private final Set<URI> disconnectedSlaves = new HashSet<URI>(); private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private ScheduledFuture<?> monitorFuture;
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config); super(config);
@ -82,13 +90,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
continue; continue;
} }
// TODO async
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
String masterHost = createAddress(master.get(0), master.get(1)); String masterHost = createAddress(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost); this.config.setMasterAddress(masterHost);
currentMaster.set(masterHost); currentMaster.set(masterHost);
log.info("master: {} added", masterHost); log.info("master: {} added", masterHost);
slaves.put(masterHost, true); slaves.add(masterHost);
List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : sentinelSlaves) { for (Map<String, String> map : sentinelSlaves) {
@ -103,7 +110,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String host = createAddress(ip, port); String host = createAddress(ip, port);
this.config.addSlaveAddress(host); this.config.addSlaveAddress(host);
slaves.put(host, true); slaves.add(host);
log.debug("slave {} state: {}", host, map); log.debug("slave {} state: {}", host, map);
log.info("slave: {} added", host); log.info("slave: {} added", host);
@ -113,6 +120,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.warn("slave: {} is down", host); log.warn("slave: {} is down", host);
} }
} }
List<Map<String, String>> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
List<RFuture<Void>> connectionFutures = new ArrayList<RFuture<Void>>(sentinelSentinels.size());
for (Map<String, String> map : sentinelSentinels) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
String host = createAddress(ip, port);
URI sentinelAddr = URIBuilder.create(host);
RFuture<Void> future = registerSentinel(cfg, sentinelAddr, this.config);
connectionFutures.add(future);
}
for (RFuture<Void> future : connectionFutures) {
future.awaitUninterruptibly(this.config.getConnectTimeout());
}
break; break;
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
log.warn("Can't connect to sentinel server. {}", e.getMessage()); log.warn("Can't connect to sentinel server. {}", e.getMessage());
@ -128,17 +156,182 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
initSingleEntry(); initSingleEntry();
List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size()); scheduleChangeCheck(cfg, null);
for (URI addr : cfg.getSentinelAddresses()) { }
RFuture<RedisPubSubConnection> future = registerSentinel(cfg, addr, this.config);
connectionFutures.add(future); private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator) {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisClient> iter = iterator;
if (iter == null) {
iter = sentinels.values().iterator();
}
checkState(cfg, iter, lastException);
}
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
protected void checkState(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get());
scheduleChangeCheck(cfg, null);
return;
}
if (!getShutdownLatch().acquire()) {
return;
} }
for (RFuture<RedisPubSubConnection> future : connectionFutures) { RedisClient client = iterator.next();
future.awaitUninterruptibly(); RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
lastException.set(future.cause());
getShutdownLatch().release();
checkState(cfg, iterator, lastException);
return;
}
RedisConnection connection = future.getNow();
updateState(cfg, connection, iterator);
}
});
}
protected void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator<RedisClient> iterator) {
final AtomicInteger commands = new AtomicInteger(2);
FutureListener<Object> commonListener = new FutureListener<Object>() {
private final AtomicBoolean failed = new AtomicBoolean();
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (commands.decrementAndGet() == 0) {
getShutdownLatch().release();
if (failed.get()) {
scheduleChangeCheck(cfg, iterator);
} else {
scheduleChangeCheck(cfg, null);
}
}
if (!future.isSuccess() && failed.compareAndSet(false, true)) {
log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), future.cause());
closeNodeConnection(connection);
}
}
};
RFuture<List<String>> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.addListener(new FutureListener<List<String>>() {
@Override
public void operationComplete(Future<List<String>> future) throws Exception {
if (!future.isSuccess()) {
return;
}
List<String> master = future.getNow();
String current = currentMaster.get();
String newMaster = createAddress(master.get(0), master.get(1));
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
}
}
});
masterFuture.addListener(commonListener);
if (!config.checkSkipSlavesInit()) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
commands.incrementAndGet();
slavesFuture.addListener(new FutureListener<List<Map<String, String>>>() {
@Override
public void operationComplete(Future<List<Map<String, String>>> future) throws Exception {
if (!future.isSuccess()) {
return;
}
List<Map<String, String>> slavesMap = future.getNow();
final Set<String> currentSlaves = new HashSet<String>(slavesMap.size());
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
String flags = map.get("flags");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
if (!isUseSameMaster(ip, port, masterHost, masterPort)) {
continue;
}
if (flags.contains("s_down") || flags.contains("disconnected")) {
slaveDown(ip, port);
continue;
}
String slaveAddr = createAddress(ip, port);
currentSlaves.add(slaveAddr);
RFuture<Void> slaveFuture = addSlave(ip, port, slaveAddr);
futures.add(slaveFuture);
}
CountableListener<Void> listener = new CountableListener<Void>() {
@Override
protected void onSuccess(Void value) {
Set<String> removedSlaves = new HashSet<String>(slaves);
removedSlaves.removeAll(currentSlaves);
for (String slave : removedSlaves) {
slaves.remove(slave);
String[] parts = slave.replace("redis://", "").split(":");
slaveDown(parts[0], parts[1]);
}
};
};
listener.setCounter(futures.size());
for (RFuture<Void> f : futures) {
f.addListener(listener);
}
}
});
slavesFuture.addListener(commonListener);
} }
RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
sentinelsFuture.addListener(new FutureListener<List<Map<String, String>>>() {
@Override
public void operationComplete(Future<List<Map<String, String>>> future) throws Exception {
if (!future.isSuccess()) {
return;
}
List<Map<String, String>> list = future.getNow();
for (Map<String, String> map : list) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
String host = createAddress(ip, port);
URI sentinelAddr = URIBuilder.create(host);
registerSentinel(cfg, sentinelAddr, getConfig());
}
}
});
sentinelsFuture.addListener(commonListener);
} }
private String createAddress(String host, Object port) { private String createAddress(String host, Object port) {
if (host.contains(":")) { if (host.contains(":")) {
host = "[" + host + "]"; host = "[" + host + "]";
@ -157,9 +350,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return entry; return entry;
} }
private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { private RFuture<Void> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); String key = addr.getHost() + ":" + addr.getPort();
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); RedisClient client = sentinels.get(key);
if (client != null) {
return RedissonPromise.newSucceededFuture(null);
}
client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
RedisClient oldClient = sentinels.putIfAbsent(key, client);
if (oldClient != null) { if (oldClient != null) {
return RedissonPromise.newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
@ -178,6 +377,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override @Override
public void onMessage(String channel, Object msg) { public void onMessage(String channel, Object msg) {
log.debug("message {} from {}", msg, channel);
if ("+sentinel".equals(channel)) { if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, (String) msg, c); onSentinelAdded(cfg, (String) msg, c);
} }
@ -209,7 +410,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
}); });
return pubsubFuture; return RedissonPromise.newSucceededFuture(null);
} }
protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) { protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) {
@ -228,42 +429,50 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (parts.length > 4 if (parts.length > 4
&& "slave".equals(parts[0])) { && "slave".equals(parts[0])) {
final String ip = parts[2]; String ip = parts[2];
final String port = parts[3]; String port = parts[3];
final String slaveAddr = createAddress(ip, port);
if (!isUseSameMaster(parts)) { if (!isUseSameMaster(parts)) {
return; return;
} }
// to avoid addition twice String slaveAddr = createAddress(ip, port);
if (slaves.putIfAbsent(slaveAddr, true) == null && !config.checkSkipSlavesInit()) { addSlave(ip, port, slaveAddr);
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); } else {
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr)); log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
future.addListener(new FutureListener<Void>() { }
@Override }
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
slaves.remove(slaveAddr);
log.error("Can't add slave: " + slaveAddr, future.cause());
return;
}
URI uri = convert(ip, port); protected RFuture<Void> addSlave(final String ip, final String port, final String slaveAddr) {
if (entry.slaveUp(uri, FreezeReason.MANAGER)) { final RPromise<Void> result = new RedissonPromise<Void>();
String slaveAddr = ip + ":" + port; // to avoid addition twice
log.info("slave: {} added", slaveAddr); if (slaves.add(slaveAddr) && !config.checkSkipSlavesInit()) {
} final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
slaves.remove(slaveAddr);
result.tryFailure(future.cause());
log.error("Can't add slave: " + slaveAddr, future.cause());
return;
} }
}); URI uri = convert(ip, port);
} else { if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
slaveUp(ip, port); String slaveAddr = ip + ":" + port;
} log.info("slave: {} added", slaveAddr);
result.trySuccess(null);
}
}
});
} else { } else {
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); slaveUp(ip, port);
result.trySuccess(null);
} }
return result;
} }
protected URI convert(String ip, String port) { protected URI convert(String ip, String port) {
@ -322,15 +531,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
private boolean isUseSameMaster(String[] parts) { private boolean isUseSameMaster(String[] parts) {
String ip = parts[2]; return isUseSameMaster(parts[2], parts[3], parts[6], parts[7]);
String port = parts[3]; }
String slaveAddr = ip + ":" + port;
protected boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) {
String master = currentMaster.get(); String master = currentMaster.get();
String slaveMaster = createAddress(parts[6], parts[7]); String slaveMaster = createAddress(slaveMasterHost, slaveMasterPort);
if (!master.equals(slaveMaster)) { if (!master.equals(slaveMaster)) {
log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master); log.warn("Skipped slave up {} for master {} differs from current {}", slaveIp + ":" + slavePort, slaveMaster, master);
return false; return false;
} }
return true; return true;
@ -410,6 +618,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override @Override
public void shutdown() { public void shutdown() {
monitorFuture.cancel(true);
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisClient sentinel : sentinels.values()) { for (RedisClient sentinel : sentinels.values()) {
RFuture<Void> future = sentinel.shutdownAsync(); RFuture<Void> future = sentinel.shutdownAsync();

@ -87,12 +87,10 @@ public class LoadBalancerManager {
RPromise<Void> result = new RedissonPromise<Void>(); RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null) { CountableListener<Void> listener = new CountableListener<Void>(result, null) {
public void operationComplete(io.netty.util.concurrent.Future<Object> future) throws Exception { @Override
super.operationComplete(future); protected void onSuccess(Void value) {
if (this.result.isSuccess()) { client2Entry.put(entry.getClient(), entry);
client2Entry.put(entry.getClient(), entry); }
}
};
}; };
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry); RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);

@ -19,6 +19,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -34,19 +35,30 @@ public class AsyncSemaphore {
counter = permits; counter = permits;
} }
public void acquireUninterruptibly() { public boolean tryAcquire(long timeoutMillis) {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
acquire(new Runnable() { final Runnable listener = new Runnable() {
@Override @Override
public void run() { public void run() {
latch.countDown(); latch.countDown();
} }
}); };
acquire(listener);
try { try {
latch.await(); boolean res = latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
if (!res) {
if (!remove(listener)) {
release();
}
}
return res;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
if (!remove(listener)) {
release();
}
return false;
} }
} }

@ -27,8 +27,10 @@ import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener; import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
@ -99,11 +101,19 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
} }
}); });
} }
protected void acquire(AsyncSemaphore semaphore) {
MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!semaphore.tryAcquire(timeout)) {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
}
}
@Override @Override
public void removeListener(int listenerId) { public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly(); acquire(semaphore);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {

@ -277,7 +277,7 @@ public class RedisRunner {
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) { try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
args.stream().forEach((arg) -> { args.stream().forEach((arg) -> {
if (arg.contains("--")) { if (arg.contains("--")) {
printer.println(arg.replace("--", "\n\r")); printer.println(arg.replace("--", ""));
} }
}); });
} }
@ -431,7 +431,7 @@ public class RedisRunner {
public RedisRunner nosave() { public RedisRunner nosave() {
this.nosave = true; this.nosave = true;
options.remove(REDIS_OPTIONS.SAVE); options.remove(REDIS_OPTIONS.SAVE);
addConfigOption(REDIS_OPTIONS.SAVE, "''"); // addConfigOption(REDIS_OPTIONS.SAVE, "''");
return this; return this;
} }
@ -472,7 +472,9 @@ public class RedisRunner {
this.randomDir = true; this.randomDir = true;
options.remove(REDIS_OPTIONS.DIR); options.remove(REDIS_OPTIONS.DIR);
makeRandomDefaultDir(); makeRandomDefaultDir();
addConfigOption(REDIS_OPTIONS.DIR, defaultDir);
addConfigOption(REDIS_OPTIONS.DIR, "\"" + defaultDir + "\"");
return this; return this;
} }
@ -868,6 +870,9 @@ public class RedisRunner {
System.out.println("REDIS RUNNER: Making directory " + f.getAbsolutePath()); System.out.println("REDIS RUNNER: Making directory " + f.getAbsolutePath());
f.mkdirs(); f.mkdirs();
this.defaultDir = f.getAbsolutePath(); this.defaultDir = f.getAbsolutePath();
if (RedissonRuntimeEnvironment.isWindows) {
defaultDir = defaultDir.replace("\\", "\\\\");
}
} }
} }

@ -49,6 +49,22 @@ public class RedissonBatchTest extends BaseTest {
List<?> t = batch.execute(); List<?> t = batch.execute();
System.out.println(t); System.out.println(t);
} }
@Test
public void testBigRequestAtomic() {
RBatch batch = redisson.createBatch();
batch.atomic();
batch.timeout(15, TimeUnit.SECONDS);
batch.retryInterval(1, TimeUnit.SECONDS);
batch.retryAttempts(5);
for (int i = 0; i < 100; i++) {
batch.getBucket("" + i).setAsync(i);
batch.getBucket("" + i).getAsync();
}
BatchResult<?> s = batch.execute();
assertThat(s.getResponses().size()).isEqualTo(200);
}
@Test @Test
public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException { public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException {

@ -514,6 +514,138 @@ public class RedissonTopicTest {
runner.stop(); runner.stop();
} }
@Test
public void testReattachInSentinel() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers().addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic<Integer> topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(new MessageListener<Integer>() {
@Override
public void onMessage(String channel, Integer msg) {
executed.set(true);
}
});
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
master = new RedisRunner()
.port(6390)
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6391)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6390)
.run();
slave2 = new RedisRunner()
.port(6392)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6390)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6390, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6390, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6390, 2)
.run();
redisson.getTopic("topic").publish(1);
await().atMost(10, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Assert.assertTrue(executed.get());
Thread.sleep(1000000);
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
}
@Test @Test
public void testReattachInCluster() throws Exception { public void testReattachInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();

Loading…
Cancel
Save