Empty slot error handling. #211

pull/218/head
Nikita 10 years ago
parent 2ac65d2fc2
commit 6b0c25471a

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
@ -212,7 +212,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionWriteException("channel: " + future.channel() + " closed"));
ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
@ -223,7 +223,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
} catch (RedisConnectionException e) {
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
@ -233,7 +233,6 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (future.isCancelled()) {
return;
}
// TODO cancel timeout
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
@ -258,7 +258,7 @@ public class CommandExecutorService implements CommandExecutor {
connectionManager.releaseWrite(slot, connection);
}
}
} catch (RedisConnectionException e) {
} catch (RedisException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
@ -422,7 +422,7 @@ public class CommandExecutorService implements CommandExecutor {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionWriteException(
ex.set(new WriteRedisConnectionException(
"Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
@ -434,7 +434,7 @@ public class CommandExecutorService implements CommandExecutor {
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
} catch (RedisConnectionException e) {
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}

@ -0,0 +1,33 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
public class RedisEmptySlotException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L;
private final int slot;
public RedisEmptySlotException(String msg, int slot) {
super(msg);
this.slot = slot;
}
public int getSlot() {
return slot;
}
}

@ -15,15 +15,15 @@
*/
package org.redisson.client;
public class RedisConnectionWriteException extends RedisException {
public class WriteRedisConnectionException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L;
public RedisConnectionWriteException(String msg) {
public WriteRedisConnectionException(String msg) {
super(msg);
}
public RedisConnectionWriteException(String msg, Throwable e) {
public WriteRedisConnectionException(String msg, Throwable e) {
super(msg, e);
}

@ -105,7 +105,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
config.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(this, config);
SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config);
entries.put(partition.getEndSlot(), entry);
lastPartitions.put(partition.getEndSlot(), partition);
}

@ -32,6 +32,7 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisEmptySlotException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
@ -59,6 +60,8 @@ import io.netty.util.concurrent.Promise;
*/
public class MasterSlaveConnectionManager implements ConnectionManager {
static final int MAX_SLOT = 16384;
private final Logger log = LoggerFactory.getLogger(getClass());
private HashedWheelTimer timer;
@ -126,8 +129,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected void initEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
entries.put(Integer.MAX_VALUE, entry);
MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config);
entries.put(MAX_SLOT, entry);
}
protected void init(Config cfg) {
@ -187,7 +190,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public int calcSlot(String key) {
if (entries.size() == 1 || key == null) {
return -1;
return 0;
}
int start = key.indexOf('{');
@ -196,7 +199,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % 16384;
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
@ -234,14 +237,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
int slot = -1;
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
@ -283,14 +286,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
int slot = -1;
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
@ -332,14 +335,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
int slot = -1;
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
@ -367,7 +370,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
releaseSubscribeConnection(0, entry);
}
}
return true;
@ -392,7 +395,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
releaseSubscribeConnection(0, entry);
}
}
return true;
@ -403,14 +406,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
});
}
protected MasterSlaveEntry getEntry() {
return getEntry(0);
}
protected MasterSlaveEntry getEntry(int slot) {
if (slot == -1) {
slot = 0;
}
return entries.ceilingEntry(slot).getValue();
}
@ -444,14 +440,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected void addSlave(String host, int port) {
getEntry().addSlave(host, port);
}
protected void slaveUp(String host, int port) {
getEntry().slaveUp(host, port);
}
protected void changeMaster(int endSlot, String host, int port) {
getEntry(endSlot).changeMaster(host, port);
}
@ -462,19 +450,27 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public RedisConnection connectionWriteOp(int slot) {
return getEntry(slot).connectionWriteOp();
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
}
return e.connectionWriteOp();
}
@Override
public RedisConnection connectionReadOp(int slot) {
return getEntry(slot).connectionReadOp();
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
}
return e.connectionReadOp();
}
RedisPubSubConnection nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection();
}
protected void returnSubscribeConnection(int slot, PubSubConnectionEntry entry) {
protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry entry) {
this.getEntry(slot).returnSubscribeConnection(entry);
}

@ -44,7 +44,12 @@ public class MasterSlaveEntry {
final MasterSlaveServersConfig config;
final ConnectionManager connectionManager;
public MasterSlaveEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
final int startSlot;
final int endSlot;
public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.startSlot = startSlot;
this.endSlot = endSlot;
this.connectionManager = connectionManager;
this.config = config;
@ -168,4 +173,16 @@ public class MasterSlaveEntry {
slaveBalancer.shutdown();
}
public int getEndSlot() {
return endSlot;
}
public int getStartSlot() {
return startSlot;
}
public boolean isOwn(int slot) {
return slot >= startSlot && slot <= endSlot;
}
}

@ -178,7 +178,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
// to avoid freeze twice
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
slaveDown(-1, ip, Integer.valueOf(port));
slaveDown(0, ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
@ -217,7 +217,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (!newMaster.equals(current)
&& master.compareAndSet(current, newMaster)) {
log.debug("changing master from {} to {}", current, newMaster);
changeMaster(-1, ip, Integer.valueOf(port));
changeMaster(0, ip, Integer.valueOf(port));
}
}
} else {
@ -225,6 +225,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void addSlave(String host, int port) {
getEntry(0).addSlave(host, port);
}
private void slaveUp(String host, int port) {
getEntry(0).slaveUp(host, port);
}
@Override
public void shutdown() {
super.shutdown();

@ -41,8 +41,8 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
@Override
protected void initEntry(MasterSlaveServersConfig config) {
SingleEntry entry = new SingleEntry(this, config);
entries.put(Integer.MAX_VALUE, entry);
SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config);
entries.put(MAX_SLOT, entry);
}
}

@ -24,8 +24,8 @@ import org.redisson.client.protocol.RedisCommands;
public class SingleEntry extends MasterSlaveEntry {
public SingleEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(connectionManager, config);
public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(startSlot, endSlot, connectionManager, config);
}
@Override

@ -5,7 +5,7 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.codec.SerializationCodec;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
@ -17,7 +17,7 @@ public class RedissonTest extends BaseTest {
private String field;
}
@Test(expected = RedisConnectionWriteException.class)
@Test(expected = WriteRedisConnectionException.class)
public void testSer() {
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");

Loading…
Cancel
Save