Memory allocation optimization. #338

pull/365/head
Nikita 9 years ago
parent f45244b979
commit fac418e9f6

@ -15,12 +15,37 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class AsyncDetails<V, R> {
static final ConcurrentLinkedQueue<AsyncDetails> queue = new ConcurrentLinkedQueue<AsyncDetails>();
Future<RedisConnection> connectionFuture;
ConnectionManager connectionManager;
Promise<R> attemptPromise;
boolean readOnlyMode;
NodeSource source;
MultiDecoder<Object> messageDecoder;
Codec codec;
RedisCommand<V> command;
Object[] params;
Promise<R> mainPromise;
int attempt;
public class AsyncDetails {
private volatile ChannelFuture writeFuture; private volatile ChannelFuture writeFuture;
@ -28,6 +53,42 @@ public class AsyncDetails {
private volatile Timeout timeout; private volatile Timeout timeout;
public AsyncDetails() {
}
public static AsyncDetails acquire() {
// AsyncDetails result = queue.poll();
// if (result != null) {
// return result;
// }
AsyncDetails details = new AsyncDetails();
return details;
}
public static void release(AsyncDetails details) {
// queue.add(details);
}
public void init(Future<RedisConnection> connectionFuture,
Promise<R> attemptPromise, boolean readOnlyMode, NodeSource source,
Codec codec, RedisCommand<V> command, Object[] params,
Promise<R> mainPromise, int attempt) {
this.connectionFuture = connectionFuture;
this.attemptPromise = attemptPromise;
this.readOnlyMode = readOnlyMode;
this.source = source;
this.messageDecoder = messageDecoder;
this.codec = codec;
this.command = command;
this.params = params;
this.mainPromise = mainPromise;
this.attempt = attempt;
this.writeFuture = writeFuture;
this.exception = exception;
this.timeout = timeout;
}
public ChannelFuture getWriteFuture() { public ChannelFuture getWriteFuture() {
return writeFuture; return writeFuture;
} }
@ -49,4 +110,46 @@ public class AsyncDetails {
this.timeout = timeout; this.timeout = timeout;
} }
public Future<RedisConnection> getConnectionFuture() {
return connectionFuture;
}
public Promise<R> getAttemptPromise() {
return attemptPromise;
}
public boolean isReadOnlyMode() {
return readOnlyMode;
}
public NodeSource getSource() {
return source;
}
public MultiDecoder<Object> getMessageDecoder() {
return messageDecoder;
}
public Codec getCodec() {
return codec;
}
public RedisCommand<V> getCommand() {
return command;
}
public Object[] getParams() {
return params;
}
public Promise<R> getMainPromise() {
return mainPromise;
}
public int getAttempt() {
return attempt;
}
} }

@ -38,7 +38,6 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
@ -88,7 +87,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0); async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -126,7 +125,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
} }
@ -164,7 +163,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}); });
ClusterSlotRange slot = slots.remove(0); ClusterSlotRange slot = slots.remove(0);
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0); async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, attemptPromise, 0);
} }
@Override @Override
@ -203,29 +202,38 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
} }
return mainPromise; return mainPromise;
} }
protected <V> RedisException convertException(Future<V> future) { public <V> RedisException convertException(Future<V> future) {
return future.cause() instanceof RedisException ? return future.cause() instanceof RedisException ?
(RedisException) future.cause() : (RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause()); new RedisException("Unexpected exception while processing command", future.cause());
} }
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
NodeSource source = NodeSource.ZERO;
if (slot != 0) {
source = new NodeSource(slot);
}
return source;
}
@Override @Override
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); NodeSource source = getNodeSource(key);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0); async(true, source, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); async(false, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -236,8 +244,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key); NodeSource source = getNodeSource(key);
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); return evalAsync(source, true, codec, evalCommandType, script, keys, params);
} }
@Override @Override
@ -248,8 +256,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key); NodeSource source = getNodeSource(key);
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params); return evalAsync(source, false, codec, evalCommandType, script, keys, params);
} }
@Override @Override
@ -284,7 +292,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, args.toArray(), promise, 0);
} }
return mainPromise; return mainPromise;
} }
@ -296,7 +304,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size()); args.add(keys.size());
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, null, codec, evalCommandType, args.toArray(), mainPromise, 0); async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise; return mainPromise;
} }
@ -308,12 +316,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); NodeSource source = getNodeSource(key);
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); async(false, source, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final Promise<R> mainPromise, final int attempt) { final RedisCommand<V> command, final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
return; return;
@ -326,8 +334,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
final Promise<R> attemptPromise = connectionManager.newPromise(); final Promise<R> attemptPromise = connectionManager.newPromise();
final AsyncDetails details = new AsyncDetails();
final Future<RedisConnection> connectionFuture; final Future<RedisConnection> connectionFuture;
if (readOnlyMode) { if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command); connectionFuture = connectionManager.connectionReadOp(source, command);
@ -335,17 +341,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
connectionFuture = connectionManager.connectionWriteOp(source, command); connectionFuture = connectionManager.connectionWriteOp(source, command);
} }
final AsyncDetails<V, R> details = AsyncDetails.acquire();
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout t) throws Exception { public void run(Timeout t) throws Exception {
if (attemptPromise.isDone()) { if (details.getAttemptPromise().isDone()) {
return; return;
} }
if (connectionFuture.cancel(false)) { if (details.getConnectionFuture().cancel(false)) {
connectionManager.getShutdownLatch().release(); connectionManager.getShutdownLatch().release();
} else { } else {
if (connectionFuture.isSuccess()) { if (details.getConnectionFuture().isSuccess()) {
ChannelFuture writeFuture = details.getWriteFuture(); ChannelFuture writeFuture = details.getWriteFuture();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
return; return;
@ -353,146 +364,203 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
} }
if (mainPromise.isCancelled()) { if (details.getMainPromise().isCancelled()) {
attemptPromise.cancel(false); if (details.getAttemptPromise().cancel(false)) {
AsyncDetails.release(details);
}
return; return;
} }
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
if (details.getException() == null) { if (details.getException() == null) {
details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params))); details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(details.getParams())));
} }
attemptPromise.tryFailure(details.getException()); details.getAttemptPromise().tryFailure(details.getException());
return; return;
} }
if (!attemptPromise.cancel(false)) { if (!details.getAttemptPromise().cancel(false)) {
return; return;
} }
int count = attempt + 1; int count = details.getAttempt() + 1;
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, count); async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
} }
}; };
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout); details.setTimeout(timeout);
connectionFuture.addListener(new FutureListener<RedisConnection>() { if (connectionFuture.isDone()) {
@Override checkConnectionFuture(source, details);
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { } else {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { connectionFuture.addListener(new FutureListener<RedisConnection>() {
return; @Override
} public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(source, details);
if (!connFuture.isSuccess()) {
details.setException(convertException(connFuture));
return;
} }
});
}
final RedisConnection connection = connFuture.getNow(); if (attemptPromise.isDone()) {
checkAttemptFuture(source, details, attemptPromise);
} else {
attemptPromise.addListener(new FutureListener<R>() {
if (source.getRedirect() == Redirect.ASK) { @Override
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2); public void operationComplete(Future<R> future) throws Exception {
Promise<Void> promise = connectionManager.newPromise(); checkAttemptFuture(source, details, future);
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
} else {
log.debug("aquired connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
details.setWriteFuture(future);
} }
});
}
details.getWriteFuture().addListener(new ChannelFutureListener() { }
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException(
"Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause()));
return;
}
details.getTimeout().cancel(); private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (details.getAttemptPromise().isDone() || future.isCancelled()) {
return;
}
int timeoutTime = connectionManager.getConfig().getTimeout(); if (!future.isSuccess()) {
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName()) details.setException(new WriteRedisConnectionException(
|| command.getName().equals(RedisCommands.BRPOP_VALUE.getName())) { "Can't write command: " + details.getCommand() + ", params: " + Arrays.toString(details.getParams()) + " to channel: " + future.channel(), future.cause()));
Integer popTimeout = Integer.valueOf(params[params.length - 1].toString()); return;
if (popTimeout == 0) { }
return;
}
timeoutTime += popTimeout*1000;
}
final int timeoutAmount = timeoutTime; details.getTimeout().cancel();
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + command
+ " with params: " + Arrays.toString(params) + " channel: " + connection.getChannel()));
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
}
});
if (readOnlyMode) { int timeoutTime = connectionManager.getConfig().getTimeout();
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection)); if (details.getCommand().getName().equals(RedisCommands.BLPOP_VALUE.getName())
} else { || details.getCommand().getName().equals(RedisCommands.BRPOP_VALUE.getName())) {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection)); Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
} if (popTimeout == 0) {
return;
} }
}); timeoutTime += popTimeout*1000;
}
attemptPromise.addListener(new FutureListener<R>() { final int timeoutAmount = timeoutTime;
TimerTask timeoutTask = new TimerTask() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void run(Timeout timeout) throws Exception {
details.getTimeout().cancel(); details.getAttemptPromise().tryFailure(
if (future.isCancelled()) { new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand()
return; + " with params: " + Arrays.toString(details.getParams()) + " channel: " + connection.getChannel()));
} }
};
if (future.cause() instanceof RedisMovedException) { Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
RedisMovedException ex = (RedisMovedException)future.cause(); details.setTimeout(timeout);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); }
return;
}
if (future.cause() instanceof RedisAskException) { private <R, V> void checkConnectionFuture(final NodeSource source,
RedisAskException ex = (RedisAskException)future.cause(); final AsyncDetails<V, R> details) {
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) {
return; return;
}
if (!details.getConnectionFuture().isSuccess()) {
details.setException(convertException(details.getConnectionFuture()));
return;
}
final RedisConnection connection = details.getConnectionFuture().getNow();
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getMessageDecoder(), details.getCodec(), details.getCommand(), details.getParams()));
Promise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
} else {
log.debug("aquired connection for command {} from slot {} using node {}", details.getCommand(), details.getSource(), connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getMessageDecoder(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
if (details.getWriteFuture().isDone()) {
checkWriteFuture(details, connection);
} else {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
} }
});
}
if (future.cause() instanceof RedisLoadingException) { if (details.getAttemptPromise().isDone()) {
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt); releaseConnection(source, details, connection);
return; } else {
details.getAttemptPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<R> future) throws Exception {
releaseConnection(source, details, connection);
} }
});
}
}
if (future.isSuccess()) { private <V, R> void releaseConnection(NodeSource source, AsyncDetails<V, R> details, RedisConnection connection) {
R res = future.getNow(); if (details.getAttemptPromise().isCancelled()) {
if (res instanceof RedisClientResult) { return;
InetSocketAddress addr = source.getAddr(); }
if (addr == null) {
addr = connectionFuture.getNow().getRedisClient().getAddr(); connectionManager.getShutdownLatch().release();
} if (details.isReadOnlyMode()) {
((RedisClientResult)res).setRedisClient(addr); connectionManager.releaseRead(source, connection);
} } else {
mainPromise.setSuccess(res); connectionManager.releaseWrite(source, connection);
} else { }
mainPromise.setFailure(future.cause()); }
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
Future<R> future) {
details.getTimeout().cancel();
if (future.isCancelled()) {
return;
}
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
return;
}
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {
InetSocketAddress addr = source.getAddr();
if (addr == null) {
addr = details.getConnectionFuture().getNow().getRedisClient().getAddr();
} }
((RedisClientResult)res).setRedisClient(addr);
} }
}); details.getMainPromise().setSuccess(res);
} else {
details.getMainPromise().setFailure(future.cause());
}
AsyncDetails.release(details);
} }
} }

@ -104,7 +104,7 @@ public class CommandBatchService extends CommandReactiveService {
} }
@Override @Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder<Object> messageDecoder, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
@ -121,7 +121,7 @@ public class CommandBatchService extends CommandReactiveService {
if (!readOnlyMode) { if (!readOnlyMode) {
entry.setReadOnlyMode(false); entry.setReadOnlyMode(false);
} }
entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet())); entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, codec, command, params), index.incrementAndGet()));
} }
public List<?> execute() { public List<?> execute() {
@ -222,7 +222,7 @@ public class CommandBatchService extends CommandReactiveService {
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone() || mainPromise.isDone()) { if (attemptPromise.isDone()) {
return; return;
} }

@ -172,6 +172,11 @@ public class ClientConnectionsEntry {
} }
private <T extends RedisConnection> void addFireEventListener(Promise<T> connectionFuture) { private <T extends RedisConnection> void addFireEventListener(Promise<T> connectionFuture) {
if (connectionFuture.isSuccess()) {
connectionManager.getConnectionEventsHub().fireConnect(connectionFuture.getNow().getRedisClient().getAddr());
return;
}
connectionFuture.addListener(new FutureListener<T>() { connectionFuture.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {

@ -27,7 +27,6 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.AsyncDetails;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
@ -85,8 +84,10 @@ public interface ConnectionManager {
Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command); Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
@Deprecated
<T> FutureListener<T> createReleaseReadListener(NodeSource source, RedisConnection conn); <T> FutureListener<T> createReleaseReadListener(NodeSource source, RedisConnection conn);
@Deprecated
<T> FutureListener<T> createReleaseWriteListener(NodeSource source, RedisConnection conn); <T> FutureListener<T> createReleaseWriteListener(NodeSource source, RedisConnection conn);
RedisClient createClient(String host, int port, int timeout); RedisClient createClient(String host, int port, int timeout);

@ -19,6 +19,8 @@ import java.net.InetSocketAddress;
public class NodeSource { public class NodeSource {
public static final NodeSource ZERO = new NodeSource(0);
public enum Redirect {MOVED, ASK} public enum Redirect {MOVED, ASK}
private final Integer slot; private final Integer slot;

Loading…
Cancel
Save