diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 2eedbe329..330b94736 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.client.RedisConnectionWriteException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -212,7 +212,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { timeout.cancel(); - ex.set(new RedisConnectionWriteException("channel: " + future.channel() + " closed")); + ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed")); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } } @@ -223,7 +223,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { } else { attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); } - } catch (RedisConnectionException e) { + } catch (RedisException e) { ex.set(e); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } @@ -233,7 +233,6 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (future.isCancelled()) { return; } - // TODO cancel timeout if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 05d0efcca..9f3170350 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisConnectionWriteException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -258,7 +258,7 @@ public class CommandExecutorService implements CommandExecutor { connectionManager.releaseWrite(slot, connection); } } - } catch (RedisConnectionException e) { + } catch (RedisException e) { if (attempt == connectionManager.getConfig().getRetryAttempts()) { throw e; } @@ -422,7 +422,7 @@ public class CommandExecutorService implements CommandExecutor { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { timeout.cancel(); - ex.set(new RedisConnectionWriteException( + ex.set(new WriteRedisConnectionException( "Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause())); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } @@ -434,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor { } else { attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); } - } catch (RedisConnectionException e) { + } catch (RedisException e) { ex.set(e); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } diff --git a/src/main/java/org/redisson/client/RedisEmptySlotException.java b/src/main/java/org/redisson/client/RedisEmptySlotException.java new file mode 100644 index 000000000..6e684dee8 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisEmptySlotException.java @@ -0,0 +1,33 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client; + +public class RedisEmptySlotException extends RedisException { + + private static final long serialVersionUID = -4756928186967834601L; + + private final int slot; + + public RedisEmptySlotException(String msg, int slot) { + super(msg); + this.slot = slot; + } + + public int getSlot() { + return slot; + } + +} diff --git a/src/main/java/org/redisson/client/RedisConnectionWriteException.java b/src/main/java/org/redisson/client/WriteRedisConnectionException.java similarity index 81% rename from src/main/java/org/redisson/client/RedisConnectionWriteException.java rename to src/main/java/org/redisson/client/WriteRedisConnectionException.java index 5b08a22a0..0414a1235 100644 --- a/src/main/java/org/redisson/client/RedisConnectionWriteException.java +++ b/src/main/java/org/redisson/client/WriteRedisConnectionException.java @@ -15,15 +15,15 @@ */ package org.redisson.client; -public class RedisConnectionWriteException extends RedisException { +public class WriteRedisConnectionException extends RedisException { private static final long serialVersionUID = -4756928186967834601L; - public RedisConnectionWriteException(String msg) { + public WriteRedisConnectionException(String msg) { super(msg); } - public RedisConnectionWriteException(String msg, Throwable e) { + public WriteRedisConnectionException(String msg, Throwable e) { super(msg, e); } diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 3b358d66a..7b9f60d01 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -105,7 +105,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); config.setMasterAddress(partition.getMasterAddress()); - SingleEntry entry = new SingleEntry(this, config); + SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config); entries.put(partition.getEndSlot(), entry); lastPartitions.put(partition.getEndSlot(), partition); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 5ee49fc2e..bc929b429 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -32,6 +32,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisEmptySlotException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; @@ -59,6 +60,8 @@ import io.netty.util.concurrent.Promise; */ public class MasterSlaveConnectionManager implements ConnectionManager { + static final int MAX_SLOT = 16384; + private final Logger log = LoggerFactory.getLogger(getClass()); private HashedWheelTimer timer; @@ -126,8 +129,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void initEntry(MasterSlaveServersConfig config) { - MasterSlaveEntry entry = new MasterSlaveEntry(this, config); - entries.put(Integer.MAX_VALUE, entry); + MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config); + entries.put(MAX_SLOT, entry); } protected void init(Config cfg) { @@ -187,7 +190,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public int calcSlot(String key) { if (entries.size() == 1 || key == null) { - return -1; + return 0; } int start = key.indexOf('{'); @@ -196,7 +199,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { key = key.substring(start+1, end); } - int result = CRC16.crc16(key.getBytes()) % 16384; + int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; log.debug("slot {} for {}", result, key); return result; } @@ -234,14 +237,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - int slot = -1; + int slot = 0; RedisPubSubConnection conn = nextPubSubConnection(slot); PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { - returnSubscribeConnection(slot, entry); + releaseSubscribeConnection(slot, entry); return oldEntry; } @@ -283,14 +286,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - int slot = -1; + int slot = 0; RedisPubSubConnection conn = nextPubSubConnection(slot); PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { - returnSubscribeConnection(slot, entry); + releaseSubscribeConnection(slot, entry); return oldEntry; } @@ -332,14 +335,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - int slot = -1; + int slot = 0; RedisPubSubConnection conn = nextPubSubConnection(slot); PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { - returnSubscribeConnection(slot, entry); + releaseSubscribeConnection(slot, entry); return; } synchronized (entry) { @@ -367,7 +370,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { synchronized (entry) { if (entry.tryClose()) { - returnSubscribeConnection(-1, entry); + releaseSubscribeConnection(0, entry); } } return true; @@ -392,7 +395,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { synchronized (entry) { if (entry.tryClose()) { - returnSubscribeConnection(-1, entry); + releaseSubscribeConnection(0, entry); } } return true; @@ -403,14 +406,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } - protected MasterSlaveEntry getEntry() { - return getEntry(0); - } - protected MasterSlaveEntry getEntry(int slot) { - if (slot == -1) { - slot = 0; - } return entries.ceilingEntry(slot).getValue(); } @@ -444,14 +440,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected void addSlave(String host, int port) { - getEntry().addSlave(host, port); - } - - protected void slaveUp(String host, int port) { - getEntry().slaveUp(host, port); - } - protected void changeMaster(int endSlot, String host, int port) { getEntry(endSlot).changeMaster(host, port); } @@ -462,19 +450,27 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RedisConnection connectionWriteOp(int slot) { - return getEntry(slot).connectionWriteOp(); + MasterSlaveEntry e = getEntry(slot); + if (!e.isOwn(slot)) { + throw new RedisEmptySlotException("No node for slot: " + slot, slot); + } + return e.connectionWriteOp(); } @Override public RedisConnection connectionReadOp(int slot) { - return getEntry(slot).connectionReadOp(); + MasterSlaveEntry e = getEntry(slot); + if (!e.isOwn(slot)) { + throw new RedisEmptySlotException("No node for slot: " + slot, slot); + } + return e.connectionReadOp(); } RedisPubSubConnection nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } - protected void returnSubscribeConnection(int slot, PubSubConnectionEntry entry) { + protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry entry) { this.getEntry(slot).returnSubscribeConnection(entry); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index ee6accf66..f34ca95fb 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -44,7 +44,12 @@ public class MasterSlaveEntry { final MasterSlaveServersConfig config; final ConnectionManager connectionManager; - public MasterSlaveEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) { + final int startSlot; + final int endSlot; + + public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + this.startSlot = startSlot; + this.endSlot = endSlot; this.connectionManager = connectionManager; this.config = config; @@ -168,4 +173,16 @@ public class MasterSlaveEntry { slaveBalancer.shutdown(); } + public int getEndSlot() { + return endSlot; + } + + public int getStartSlot() { + return startSlot; + } + + public boolean isOwn(int slot) { + return slot >= startSlot && slot <= endSlot; + } + } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index b66f769eb..0874b3077 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -178,7 +178,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { // to avoid freeze twice if (freezeSlaves.add(slaveAddr)) { log.debug("Slave has down - {}", slaveAddr); - slaveDown(-1, ip, Integer.valueOf(port)); + slaveDown(0, ip, Integer.valueOf(port)); } } else { log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); @@ -217,7 +217,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (!newMaster.equals(current) && master.compareAndSet(current, newMaster)) { log.debug("changing master from {} to {}", current, newMaster); - changeMaster(-1, ip, Integer.valueOf(port)); + changeMaster(0, ip, Integer.valueOf(port)); } } } else { @@ -225,6 +225,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } + private void addSlave(String host, int port) { + getEntry(0).addSlave(host, port); + } + + private void slaveUp(String host, int port) { + getEntry(0).slaveUp(host, port); + } + @Override public void shutdown() { super.shutdown(); diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 07b794756..717725532 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -41,8 +41,8 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { @Override protected void initEntry(MasterSlaveServersConfig config) { - SingleEntry entry = new SingleEntry(this, config); - entries.put(Integer.MAX_VALUE, entry); + SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config); + entries.put(MAX_SLOT, entry); } } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 87e5a9b95..f4d3cc193 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -24,8 +24,8 @@ import org.redisson.client.protocol.RedisCommands; public class SingleEntry extends MasterSlaveEntry { - public SingleEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) { - super(connectionManager, config); + public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + super(startSlot, endSlot, connectionManager, config); } @Override diff --git a/src/test/java/org/redisson/RedissonTest.java b/src/test/java/org/redisson/RedissonTest.java index 9625f0b66..11118301d 100644 --- a/src/test/java/org/redisson/RedissonTest.java +++ b/src/test/java/org/redisson/RedissonTest.java @@ -5,7 +5,7 @@ import java.util.Map; import org.junit.Assert; import org.junit.Test; -import org.redisson.client.RedisConnectionWriteException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.codec.SerializationCodec; import org.redisson.core.ClusterNode; import org.redisson.core.Node; @@ -17,7 +17,7 @@ public class RedissonTest extends BaseTest { private String field; } - @Test(expected = RedisConnectionWriteException.class) + @Test(expected = WriteRedisConnectionException.class) public void testSer() { Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6379");