diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 96081f34c..280fb0e41 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisAskException; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; @@ -36,6 +37,7 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.NodeSource; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -102,15 +104,15 @@ public class CommandBatchExecutorService extends CommandExecutorService { } @Override - protected void async(boolean readOnlyMode, int slot, MultiDecoder messageDecoder, + protected void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder messageDecoder, Codec codec, RedisCommand command, Object[] params, Promise mainPromise, RedisClient client, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); } - Entry entry = commands.get(slot); + Entry entry = commands.get(nodeSource.getSlot()); if (entry == null) { entry = new Entry(); - Entry oldEntry = commands.putIfAbsent(slot, entry); + Entry oldEntry = commands.putIfAbsent(nodeSource.getSlot(), entry); if (oldEntry != null) { entry = oldEntry; } @@ -143,6 +145,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { promise.setFailure(future.cause()); + commands = null; return; } @@ -162,12 +165,12 @@ public class CommandBatchExecutorService extends CommandExecutorService { AtomicInteger slots = new AtomicInteger(commands.size()); for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), e.getKey(), voidPromise, slots, 0); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); } return promise; } - public void execute(final Entry entry, final int slot, final Promise mainPromise, final AtomicInteger slots, final int attempt) { + public void execute(final Entry entry, final NodeSource source, final Promise mainPromise, final AtomicInteger slots, final int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -189,15 +192,15 @@ public class CommandBatchExecutorService extends CommandExecutorService { attemptPromise.cancel(true); int count = attempt + 1; - execute(entry, slot, mainPromise, slots, count); + execute(entry, source, mainPromise, slots, count); } }; Future connectionFuture; if (entry.isReadOnlyMode()) { - connectionFuture = connectionManager.connectionReadOp(slot, null); + connectionFuture = connectionManager.connectionReadOp(source, null); } else { - connectionFuture = connectionManager.connectionWriteOp(slot, null); + connectionFuture = connectionManager.connectionWriteOp(source, null); } connectionFuture.addListener(new FutureListener() { @@ -235,9 +238,9 @@ public class CommandBatchExecutorService extends CommandExecutorService { }); if (entry.isReadOnlyMode()) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout)); } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout)); } } }); @@ -251,7 +254,12 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); - execute(entry, ex.getSlot(), mainPromise, slots, attempt); + execute(entry, new NodeSource(ex.getSlot()), mainPromise, slots, attempt); + return; + } + if (future.cause() instanceof RedisAskException) { + RedisAskException ex = (RedisAskException)future.cause(); + execute(entry, new NodeSource(ex.getAddr()), mainPromise, slots, attempt); return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index ec0c63dd6..0fa7fdfe5 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.CommandBatchExecutorService.CommandEntry; +import org.redisson.client.RedisAskException; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; @@ -34,10 +36,13 @@ import org.redisson.client.RedisTimeoutException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.NodeSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,7 +104,7 @@ public class CommandExecutorService implements CommandExecutor { }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); + async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -136,7 +141,7 @@ public class CommandExecutorService implements CommandExecutor { }); ClusterSlotRange slot = slots.remove(0); - async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); + async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); } public Future writeAllAsync(RedisCommand command, Object ... params) { @@ -173,7 +178,7 @@ public class CommandExecutorService implements CommandExecutor { } }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -215,14 +220,14 @@ public class CommandExecutorService implements CommandExecutor { public Future readAsync(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, slot, null, codec, command, params, mainPromise, client, 0); + async(true, new NodeSource(slot), null, codec, command, params, mainPromise, client, 0); return mainPromise; } public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, slot, null, codec, command, params, mainPromise, null, 0); + async(true, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -233,7 +238,7 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(false, slot, null, codec, command, params, mainPromise, null, 0); + async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -243,15 +248,15 @@ public class CommandExecutorService implements CommandExecutor { public R write(String key, Codec codec, SyncOperation operation) { int slot = connectionManager.calcSlot(key); - return async(false, codec, slot, operation, 0); + return async(false, codec, new NodeSource(slot), operation, 0); } public R read(String key, Codec codec, SyncOperation operation) { int slot = connectionManager.calcSlot(key); - return async(true, codec, slot, operation, 0); + return async(true, codec, new NodeSource(slot), operation, 0); } - private R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation operation, int attempt) { + private R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation operation, int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { return null; } @@ -259,9 +264,9 @@ public class CommandExecutorService implements CommandExecutor { try { Future connectionFuture; if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(slot, null); + connectionFuture = connectionManager.connectionReadOp(source, null); } else { - connectionFuture = connectionManager.connectionWriteOp(slot, null); + connectionFuture = connectionManager.connectionWriteOp(source, null); } connectionFuture.syncUninterruptibly(); @@ -270,19 +275,21 @@ public class CommandExecutorService implements CommandExecutor { try { return operation.execute(codec, connection); } catch (RedisMovedException e) { - return async(readOnlyMode, codec, e.getSlot(), operation, attempt); + return async(readOnlyMode, codec, new NodeSource(e.getSlot()), operation, attempt); + } catch (RedisAskException e) { + return async(readOnlyMode, codec, new NodeSource(e.getAddr()), operation, attempt); } catch (RedisTimeoutException e) { if (attempt == connectionManager.getConfig().getRetryAttempts()) { throw e; } attempt++; - return async(readOnlyMode, codec, slot, operation, attempt); + return async(readOnlyMode, codec, source, operation, attempt); } finally { connectionManager.getShutdownLatch().release(); if (readOnlyMode) { - connectionManager.releaseRead(slot, connection); + connectionManager.releaseRead(source, connection); } else { - connectionManager.releaseWrite(slot, connection); + connectionManager.releaseWrite(source, connection); } } } catch (RedisException e) { @@ -295,7 +302,7 @@ public class CommandExecutorService implements CommandExecutor { Thread.currentThread().interrupt(); } attempt++; - return async(readOnlyMode, codec, slot, operation, attempt); + return async(readOnlyMode, codec, source, operation, attempt); } } @@ -355,7 +362,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); } return mainPromise; } @@ -368,7 +375,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); int slot = connectionManager.calcSlot(key); - async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); + async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); return mainPromise; } @@ -398,11 +405,11 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(false, slot, null, codec, command, params, mainPromise, null, 0); + async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0); return mainPromise; } - protected void async(final boolean readOnlyMode, final int slot, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, + protected void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, final Object[] params, final Promise mainPromise, final RedisClient client, final int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); @@ -427,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor { } int count = attempt + 1; - async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count); + async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, client, count); } }; @@ -437,12 +444,12 @@ public class CommandExecutorService implements CommandExecutor { Future connectionFuture; if (readOnlyMode) { if (client != null) { - connectionFuture = connectionManager.connectionReadOp(slot, command, client); + connectionFuture = connectionManager.connectionReadOp(source, command, client); } else { - connectionFuture = connectionManager.connectionReadOp(slot, command); + connectionFuture = connectionManager.connectionReadOp(source, command); } } else { - connectionFuture = connectionManager.connectionWriteOp(slot, command); + connectionFuture = connectionManager.connectionWriteOp(source, command); } connectionFuture.addListener(new FutureListener() { @@ -463,8 +470,20 @@ public class CommandExecutorService implements CommandExecutor { RedisConnection connection = connFuture.getNow(); - log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); - ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + ChannelFuture future = null; + if (source.getAddr() != null) { + // ASK handling + List> list = new ArrayList>(2); + Promise promise = connectionManager.newPromise(); + list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[] {})); + list.add(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + Promise main = connectionManager.newPromise(); + future = connection.send(new CommandsData(main, list)); + } else { + log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr()); + future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + } + future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -481,9 +500,9 @@ public class CommandExecutorService implements CommandExecutor { }); if (readOnlyMode) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout)); } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout)); } } }); @@ -503,7 +522,18 @@ public class CommandExecutorService implements CommandExecutor { RedisMovedException ex = (RedisMovedException)future.cause(); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt); + async(readOnlyMode, new NodeSource(ex.getSlot()), messageDecoder, codec, command, params, mainPromise, client, attempt); + return; + } + + if (future.cause() instanceof RedisAskException) { + if (!connectionManager.getShutdownLatch().acquire()) { + return; + } + + RedisAskException ex = (RedisAskException)future.cause(); + connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + async(readOnlyMode, new NodeSource(ex.getAddr()), messageDecoder, codec, command, params, mainPromise, client, attempt); return; } diff --git a/src/main/java/org/redisson/RedissonPatternTopic.java b/src/main/java/org/redisson/RedissonPatternTopic.java index 86878187f..90d9ae2f2 100644 --- a/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/src/main/java/org/redisson/RedissonPatternTopic.java @@ -77,7 +77,7 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(int listenerId) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { return; } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index e1ccdafb8..f4883108e 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -92,7 +92,7 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { return; } diff --git a/src/main/java/org/redisson/client/RedisAskException.java b/src/main/java/org/redisson/client/RedisAskException.java new file mode 100644 index 000000000..e85e6e1bb --- /dev/null +++ b/src/main/java/org/redisson/client/RedisAskException.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client; + +import java.net.InetSocketAddress; +import java.net.URI; + +public class RedisAskException extends RedisException { + + private static final long serialVersionUID = -6969734163155547631L; + + private URI url; + + public RedisAskException(String url) { + this.url = URI.create("//" + url); + } + + public URI getUrl() { + return url; + } + + public InetSocketAddress getAddr() { + return new InetSocketAddress(url.getHost(), url.getPort()); + } + +} diff --git a/src/main/java/org/redisson/client/RedisEmptySlotException.java b/src/main/java/org/redisson/client/RedisNodeNotFoundException.java similarity index 76% rename from src/main/java/org/redisson/client/RedisEmptySlotException.java rename to src/main/java/org/redisson/client/RedisNodeNotFoundException.java index 6e684dee8..042e167c2 100644 --- a/src/main/java/org/redisson/client/RedisEmptySlotException.java +++ b/src/main/java/org/redisson/client/RedisNodeNotFoundException.java @@ -15,19 +15,12 @@ */ package org.redisson.client; -public class RedisEmptySlotException extends RedisException { +public class RedisNodeNotFoundException extends RedisException { private static final long serialVersionUID = -4756928186967834601L; - private final int slot; - - public RedisEmptySlotException(String msg, int slot) { + public RedisNodeNotFoundException(String msg) { super(msg); - this.slot = slot; - } - - public int getSlot() { - return slot; } } diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 13113f756..308eb89e9 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.redisson.client.RedisAskException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; @@ -160,10 +161,9 @@ public class CommandDecoder extends ReplayingDecoder { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); data.getPromise().setFailure(new RedisMovedException(slot)); - } else if (error.startsWith("(error) ASK")) { + } else if (error.startsWith("ASK")) { String[] errorParts = error.split(" "); - int slot = Integer.valueOf(errorParts[2]); - data.getPromise().setFailure(new RedisMovedException(slot)); + data.getPromise().setFailure(new RedisAskException(errorParts[2])); } else { data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 1e1606a46..b902987ad 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -49,6 +49,8 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; public interface RedisCommands { + RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); + RedisCommand ZADD = new RedisCommand("ZADD", new BooleanAmountReplayConvertor(), 3); RedisCommand ZREM = new RedisCommand("ZREM", new BooleanAmountReplayConvertor(), 2); RedisStrictCommand ZCARD = new RedisStrictCommand("ZCARD", new IntegerReplayConvertor()); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 3d67e1b8b..e20fdc71f 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -60,27 +61,29 @@ public interface ConnectionManager { Promise newPromise(); - void releaseRead(int slot, RedisConnection connection); + void releaseRead(NodeSource source, RedisConnection connection); - void releaseWrite(int slot, RedisConnection connection); + void releaseWrite(NodeSource source, RedisConnection connection); - Future connectionReadOp(int slot, RedisCommand command); + Future connectionReadOp(NodeSource source, RedisCommand command); - Future connectionReadOp(int slot, RedisCommand command, RedisClient client); + Future connectionReadOp(NodeSource source, RedisCommand command, RedisClient client); - Future connectionWriteOp(int slot, RedisCommand command); + Future connectionWriteOp(NodeSource source, RedisCommand command); - FutureListener createReleaseReadListener(int slot, + FutureListener createReleaseReadListener(NodeSource source, RedisConnection conn, Timeout timeout); - FutureListener createReleaseWriteListener(int slot, + FutureListener createReleaseWriteListener(NodeSource source, RedisConnection conn, Timeout timeout); RedisClient createClient(String host, int port, int timeout); RedisClient createClient(String host, int port); - PubSubConnectionEntry getEntry(String channelName); + MasterSlaveEntry getEntry(InetSocketAddress addr); + + PubSubConnectionEntry getPubSubEntry(String channelName); Future subscribe(String channelName, Codec codec); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9aa3e8fce..17f2ff3fe 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -29,7 +30,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisEmptySlotException; +import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; @@ -168,7 +169,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public FutureListener createReleaseWriteListener(final int slot, + public FutureListener createReleaseWriteListener(final NodeSource source, final RedisConnection conn, final Timeout timeout) { return new FutureListener() { @Override @@ -181,13 +182,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { shutdownLatch.release(); timeout.cancel(); - releaseWrite(slot, conn); + releaseWrite(source, conn); } }; } @Override - public FutureListener createReleaseReadListener(final int slot, + public FutureListener createReleaseReadListener(final NodeSource source, final RedisConnection conn, final Timeout timeout) { return new FutureListener() { @Override @@ -200,7 +201,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { shutdownLatch.release(); timeout.cancel(); - releaseRead(slot, conn); + releaseRead(source, conn); } }; } @@ -211,7 +212,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public PubSubConnectionEntry getEntry(String channelName) { + public PubSubConnectionEntry getPubSubEntry(String channelName) { return name2PubSubConnection.get(channelName); } @@ -463,11 +464,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return entryCodec; } + public MasterSlaveEntry getEntry(InetSocketAddress addr) { + // TODO optimize + for (Entry entry : entries.entrySet()) { + if (entry.getValue().getClient().getAddr().equals(addr)) { + return entry.getValue(); + } + } + return null; + } + protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) { return entries.get(slotRange); } protected MasterSlaveEntry getEntry(int slot) { + // TODO optimize for (Entry entry : entries.entrySet()) { if (entry.getKey().isOwn(slot)) { return entry.getValue(); @@ -545,29 +557,52 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future connectionWriteOp(int slot, RedisCommand command) { - MasterSlaveEntry e = getEntry(slot); - if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); - } + public Future connectionWriteOp(NodeSource source, RedisCommand command) { + MasterSlaveEntry e = getEntry(source, command); return e.connectionWriteOp(); } - @Override - public Future connectionReadOp(int slot, RedisCommand command) { - MasterSlaveEntry e = getEntry(slot); - if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); + private MasterSlaveEntry getEntry(NodeSource source) { + MasterSlaveEntry e = null; + if (source.getSlot() != null) { + e = getEntry(source.getSlot()); + if (e == null) { + throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot()); + } + } else { + e = getEntry(source.getAddr()); + if (e == null) { + throw new RedisNodeNotFoundException("No node with addr: " + source.getAddr()); + } } + return e; + } + + private MasterSlaveEntry getEntry(NodeSource source, RedisCommand command) { + MasterSlaveEntry e = null; + if (source.getSlot() != null) { + e = getEntry(source.getSlot()); + if (e == null) { + throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command); + } + } else { + e = getEntry(source.getAddr()); + if (e == null) { + throw new RedisNodeNotFoundException("No node for addr: " + source.getAddr() + " and command " + command); + } + } + return e; + } + + @Override + public Future connectionReadOp(NodeSource source, RedisCommand command) { + MasterSlaveEntry e = getEntry(source, command); return e.connectionReadOp(); } @Override - public Future connectionReadOp(int slot, RedisCommand command, RedisClient client) { - MasterSlaveEntry e = getEntry(slot); - if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); - } + public Future connectionReadOp(NodeSource source, RedisCommand command, RedisClient client) { + MasterSlaveEntry e = getEntry(source, command); return e.connectionReadOp(client); } @@ -580,13 +615,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public void releaseWrite(int slot, RedisConnection connection) { - getEntry(slot).releaseWrite(connection); + public void releaseWrite(NodeSource source, RedisConnection connection) { + getEntry(source).releaseWrite(connection); } @Override - public void releaseRead(int slot, RedisConnection connection) { - getEntry(slot).releaseRead(connection); + public void releaseRead(NodeSource source, RedisConnection connection) { + getEntry(source).releaseRead(connection); } @Override diff --git a/src/main/java/org/redisson/connection/NodeSource.java b/src/main/java/org/redisson/connection/NodeSource.java new file mode 100644 index 000000000..df976d316 --- /dev/null +++ b/src/main/java/org/redisson/connection/NodeSource.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection; + +import java.net.InetSocketAddress; + +public class NodeSource { + + private final Integer slot; + private final InetSocketAddress addr; + + public NodeSource(Integer slot) { + this(slot, null); + } + + public NodeSource(InetSocketAddress addr) { + this(null, addr); + } + + private NodeSource(Integer slot, InetSocketAddress addr) { + this.slot = slot; + this.addr = addr; + } + + public Integer getSlot() { + return slot; + } + + public InetSocketAddress getAddr() { + return addr; + } + +}