ASK handling. #264

pull/282/head
Nikita 9 years ago
parent f8d3b5612e
commit 8279398720

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
@ -36,6 +37,7 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -102,15 +104,15 @@ public class CommandBatchExecutorService extends CommandExecutorService {
} }
@Override @Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
Entry entry = commands.get(slot); Entry entry = commands.get(nodeSource.getSlot());
if (entry == null) { if (entry == null) {
entry = new Entry(); entry = new Entry();
Entry oldEntry = commands.putIfAbsent(slot, entry); Entry oldEntry = commands.putIfAbsent(nodeSource.getSlot(), entry);
if (oldEntry != null) { if (oldEntry != null) {
entry = oldEntry; entry = oldEntry;
} }
@ -143,6 +145,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
promise.setFailure(future.cause()); promise.setFailure(future.cause());
commands = null;
return; return;
} }
@ -162,12 +165,12 @@ public class CommandBatchExecutorService extends CommandExecutorService {
AtomicInteger slots = new AtomicInteger(commands.size()); AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) { for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), e.getKey(), voidPromise, slots, 0); execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
} }
return promise; return promise;
} }
public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) { public void execute(final Entry entry, final NodeSource source, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) { if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return; return;
@ -189,15 +192,15 @@ public class CommandBatchExecutorService extends CommandExecutorService {
attemptPromise.cancel(true); attemptPromise.cancel(true);
int count = attempt + 1; int count = attempt + 1;
execute(entry, slot, mainPromise, slots, count); execute(entry, source, mainPromise, slots, count);
} }
}; };
Future<RedisConnection> connectionFuture; Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) { if (entry.isReadOnlyMode()) {
connectionFuture = connectionManager.connectionReadOp(slot, null); connectionFuture = connectionManager.connectionReadOp(source, null);
} else { } else {
connectionFuture = connectionManager.connectionWriteOp(slot, null); connectionFuture = connectionManager.connectionWriteOp(source, null);
} }
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@ -235,9 +238,9 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}); });
if (entry.isReadOnlyMode()) { if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
} else { } else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
} }
} }
}); });
@ -251,7 +254,12 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, ex.getSlot(), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot()), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
execute(entry, new NodeSource(ex.getAddr()), mainPromise, slots, attempt);
return; return;
} }

@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.CommandBatchExecutorService.CommandEntry;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
@ -34,10 +36,13 @@ import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -99,7 +104,7 @@ public class CommandExecutorService implements CommandExecutor {
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
@ -136,7 +141,7 @@ public class CommandExecutorService implements CommandExecutor {
}); });
ClusterSlotRange slot = slots.remove(0); ClusterSlotRange slot = slots.remove(0);
async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0);
} }
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) { public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
@ -173,7 +178,7 @@ public class CommandExecutorService implements CommandExecutor {
} }
}; };
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
@ -215,14 +220,14 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, client, 0); async(true, new NodeSource(slot), null, codec, command, params, mainPromise, client, 0);
return mainPromise; return mainPromise;
} }
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, null, 0); async(true, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }
@ -233,7 +238,7 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
async(false, slot, null, codec, command, params, mainPromise, null, 0); async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }
@ -243,15 +248,15 @@ public class CommandExecutorService implements CommandExecutor {
public <R> R write(String key, Codec codec, SyncOperation<R> operation) { public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
return async(false, codec, slot, operation, 0); return async(false, codec, new NodeSource(slot), operation, 0);
} }
public <R> R read(String key, Codec codec, SyncOperation<R> operation) { public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
return async(true, codec, slot, operation, 0); return async(true, codec, new NodeSource(slot), operation, 0);
} }
private <R> R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation<R> operation, int attempt) { private <R> R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) { if (!connectionManager.getShutdownLatch().acquire()) {
return null; return null;
} }
@ -259,9 +264,9 @@ public class CommandExecutorService implements CommandExecutor {
try { try {
Future<RedisConnection> connectionFuture; Future<RedisConnection> connectionFuture;
if (readOnlyMode) { if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(slot, null); connectionFuture = connectionManager.connectionReadOp(source, null);
} else { } else {
connectionFuture = connectionManager.connectionWriteOp(slot, null); connectionFuture = connectionManager.connectionWriteOp(source, null);
} }
connectionFuture.syncUninterruptibly(); connectionFuture.syncUninterruptibly();
@ -270,19 +275,21 @@ public class CommandExecutorService implements CommandExecutor {
try { try {
return operation.execute(codec, connection); return operation.execute(codec, connection);
} catch (RedisMovedException e) { } catch (RedisMovedException e) {
return async(readOnlyMode, codec, e.getSlot(), operation, attempt); return async(readOnlyMode, codec, new NodeSource(e.getSlot()), operation, attempt);
} catch (RedisAskException e) {
return async(readOnlyMode, codec, new NodeSource(e.getAddr()), operation, attempt);
} catch (RedisTimeoutException e) { } catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e; throw e;
} }
attempt++; attempt++;
return async(readOnlyMode, codec, slot, operation, attempt); return async(readOnlyMode, codec, source, operation, attempt);
} finally { } finally {
connectionManager.getShutdownLatch().release(); connectionManager.getShutdownLatch().release();
if (readOnlyMode) { if (readOnlyMode) {
connectionManager.releaseRead(slot, connection); connectionManager.releaseRead(source, connection);
} else { } else {
connectionManager.releaseWrite(slot, connection); connectionManager.releaseWrite(source, connection);
} }
} }
} catch (RedisException e) { } catch (RedisException e) {
@ -295,7 +302,7 @@ public class CommandExecutorService implements CommandExecutor {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
attempt++; attempt++;
return async(readOnlyMode, codec, slot, operation, attempt); return async(readOnlyMode, codec, source, operation, attempt);
} }
} }
@ -355,7 +362,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
@ -368,7 +375,7 @@ public class CommandExecutorService implements CommandExecutor {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, null, 0);
return mainPromise; return mainPromise;
} }
@ -398,11 +405,11 @@ public class CommandExecutorService implements CommandExecutor {
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(false, slot, null, codec, command, params, mainPromise, null, 0); async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }
protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command, protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) { final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) { if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
@ -427,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor {
} }
int count = attempt + 1; int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count); async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, client, count);
} }
}; };
@ -437,12 +444,12 @@ public class CommandExecutorService implements CommandExecutor {
Future<RedisConnection> connectionFuture; Future<RedisConnection> connectionFuture;
if (readOnlyMode) { if (readOnlyMode) {
if (client != null) { if (client != null) {
connectionFuture = connectionManager.connectionReadOp(slot, command, client); connectionFuture = connectionManager.connectionReadOp(source, command, client);
} else { } else {
connectionFuture = connectionManager.connectionReadOp(slot, command); connectionFuture = connectionManager.connectionReadOp(source, command);
} }
} else { } else {
connectionFuture = connectionManager.connectionWriteOp(slot, command); connectionFuture = connectionManager.connectionWriteOp(source, command);
} }
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@ -463,8 +470,20 @@ public class CommandExecutorService implements CommandExecutor {
RedisConnection connection = connFuture.getNow(); RedisConnection connection = connFuture.getNow();
log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); ChannelFuture future = null;
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); if (source.getAddr() != null) {
// ASK handling
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise();
future = connection.send(new CommandsData(main, list));
} else {
log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
}
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
@ -481,9 +500,9 @@ public class CommandExecutorService implements CommandExecutor {
}); });
if (readOnlyMode) { if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
} else { } else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
} }
} }
}); });
@ -503,7 +522,18 @@ public class CommandExecutorService implements CommandExecutor {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt); async(readOnlyMode, new NodeSource(ex.getSlot()), messageDecoder, codec, command, params, mainPromise, client, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisAskException ex = (RedisAskException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getAddr()), messageDecoder, codec, command, params, mainPromise, client, attempt);
return; return;
} }

@ -77,7 +77,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override @Override
public void removeListener(int listenerId) { public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
return; return;
} }

@ -92,7 +92,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override @Override
public void removeListener(int listenerId) { public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) { if (entry == null) {
return; return;
} }

@ -0,0 +1,39 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
public class RedisAskException extends RedisException {
private static final long serialVersionUID = -6969734163155547631L;
private URI url;
public RedisAskException(String url) {
this.url = URI.create("//" + url);
}
public URI getUrl() {
return url;
}
public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
}
}

@ -15,19 +15,12 @@
*/ */
package org.redisson.client; package org.redisson.client;
public class RedisEmptySlotException extends RedisException { public class RedisNodeNotFoundException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L; private static final long serialVersionUID = -4756928186967834601L;
private final int slot; public RedisNodeNotFoundException(String msg) {
public RedisEmptySlotException(String msg, int slot) {
super(msg); super(msg);
this.slot = slot;
}
public int getSlot() {
return slot;
} }
} }

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -160,10 +161,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String[] errorParts = error.split(" "); String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]); int slot = Integer.valueOf(errorParts[1]);
data.getPromise().setFailure(new RedisMovedException(slot)); data.getPromise().setFailure(new RedisMovedException(slot));
} else if (error.startsWith("(error) ASK")) { } else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" "); String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[2]); data.getPromise().setFailure(new RedisAskException(errorParts[2]));
data.getPromise().setFailure(new RedisMovedException(slot));
} else { } else {
data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
} }

@ -49,6 +49,8 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands { public interface RedisCommands {
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3); RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2); RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);
RedisStrictCommand<Integer> ZCARD = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZCARD = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());

@ -15,6 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -60,27 +61,29 @@ public interface ConnectionManager {
<R> Promise<R> newPromise(); <R> Promise<R> newPromise();
void releaseRead(int slot, RedisConnection connection); void releaseRead(NodeSource source, RedisConnection connection);
void releaseWrite(int slot, RedisConnection connection); void releaseWrite(NodeSource source, RedisConnection connection);
Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command); Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command);
Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command, RedisClient client); Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command, RedisClient client);
Future<RedisConnection> connectionWriteOp(int slot, RedisCommand<?> command); Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
<T> FutureListener<T> createReleaseReadListener(int slot, <T> FutureListener<T> createReleaseReadListener(NodeSource source,
RedisConnection conn, Timeout timeout); RedisConnection conn, Timeout timeout);
<T> FutureListener<T> createReleaseWriteListener(int slot, <T> FutureListener<T> createReleaseWriteListener(NodeSource source,
RedisConnection conn, Timeout timeout); RedisConnection conn, Timeout timeout);
RedisClient createClient(String host, int port, int timeout); RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port); RedisClient createClient(String host, int port);
PubSubConnectionEntry getEntry(String channelName); MasterSlaveEntry getEntry(InetSocketAddress addr);
PubSubConnectionEntry getPubSubEntry(String channelName);
Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec); Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec);

@ -15,6 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -29,7 +30,7 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisEmptySlotException; import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -168,7 +169,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public <T> FutureListener<T> createReleaseWriteListener(final int slot, public <T> FutureListener<T> createReleaseWriteListener(final NodeSource source,
final RedisConnection conn, final Timeout timeout) { final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
@ -181,13 +182,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
shutdownLatch.release(); shutdownLatch.release();
timeout.cancel(); timeout.cancel();
releaseWrite(slot, conn); releaseWrite(source, conn);
} }
}; };
} }
@Override @Override
public <T> FutureListener<T> createReleaseReadListener(final int slot, public <T> FutureListener<T> createReleaseReadListener(final NodeSource source,
final RedisConnection conn, final Timeout timeout) { final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
@ -200,7 +201,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
shutdownLatch.release(); shutdownLatch.release();
timeout.cancel(); timeout.cancel();
releaseRead(slot, conn); releaseRead(source, conn);
} }
}; };
} }
@ -211,7 +212,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public PubSubConnectionEntry getEntry(String channelName) { public PubSubConnectionEntry getPubSubEntry(String channelName) {
return name2PubSubConnection.get(channelName); return name2PubSubConnection.get(channelName);
} }
@ -463,11 +464,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entryCodec; return entryCodec;
} }
public MasterSlaveEntry getEntry(InetSocketAddress addr) {
// TODO optimize
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getValue().getClient().getAddr().equals(addr)) {
return entry.getValue();
}
}
return null;
}
protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) { protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) {
return entries.get(slotRange); return entries.get(slotRange);
} }
protected MasterSlaveEntry getEntry(int slot) { protected MasterSlaveEntry getEntry(int slot) {
// TODO optimize
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : entries.entrySet()) { for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getKey().isOwn(slot)) { if (entry.getKey().isOwn(slot)) {
return entry.getValue(); return entry.getValue();
@ -545,29 +557,52 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public Future<RedisConnection> connectionWriteOp(int slot, RedisCommand<?> command) { public Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(slot); MasterSlaveEntry e = getEntry(source, command);
if (e == null) {
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot);
}
return e.connectionWriteOp(); return e.connectionWriteOp();
} }
@Override private MasterSlaveEntry getEntry(NodeSource source) {
public Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command) { MasterSlaveEntry e = null;
MasterSlaveEntry e = getEntry(slot); if (source.getSlot() != null) {
if (e == null) { e = getEntry(source.getSlot());
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); if (e == null) {
throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot());
}
} else {
e = getEntry(source.getAddr());
if (e == null) {
throw new RedisNodeNotFoundException("No node with addr: " + source.getAddr());
}
} }
return e;
}
private MasterSlaveEntry getEntry(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = null;
if (source.getSlot() != null) {
e = getEntry(source.getSlot());
if (e == null) {
throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command);
}
} else {
e = getEntry(source.getAddr());
if (e == null) {
throw new RedisNodeNotFoundException("No node for addr: " + source.getAddr() + " and command " + command);
}
}
return e;
}
@Override
public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(source, command);
return e.connectionReadOp(); return e.connectionReadOp();
} }
@Override @Override
public Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command, RedisClient client) { public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command, RedisClient client) {
MasterSlaveEntry e = getEntry(slot); MasterSlaveEntry e = getEntry(source, command);
if (e == null) {
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot);
}
return e.connectionReadOp(client); return e.connectionReadOp(client);
} }
@ -580,13 +615,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public void releaseWrite(int slot, RedisConnection connection) { public void releaseWrite(NodeSource source, RedisConnection connection) {
getEntry(slot).releaseWrite(connection); getEntry(source).releaseWrite(connection);
} }
@Override @Override
public void releaseRead(int slot, RedisConnection connection) { public void releaseRead(NodeSource source, RedisConnection connection) {
getEntry(slot).releaseRead(connection); getEntry(source).releaseRead(connection);
} }
@Override @Override

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
public class NodeSource {
private final Integer slot;
private final InetSocketAddress addr;
public NodeSource(Integer slot) {
this(slot, null);
}
public NodeSource(InetSocketAddress addr) {
this(null, addr);
}
private NodeSource(Integer slot, InetSocketAddress addr) {
this.slot = slot;
this.addr = addr;
}
public Integer getSlot() {
return slot;
}
public InetSocketAddress getAddr() {
return addr;
}
}
Loading…
Cancel
Save