Fixed - "READONLY can't write against a read only slave" error. #1272 #945

pull/1423/head
Nikita 7 years ago
parent 9a36a5badc
commit c72e6ad943

@ -141,6 +141,11 @@ public class ClientConnectionsEntry {
}
public void releaseConnection(RedisConnection connection) {
if (client != connection.getRedisClient()) {
connection.closeAsync();
return;
}
connection.setLastUsageTime(System.currentTimeMillis());
freeConnections.add(connection);
}
@ -215,6 +220,11 @@ public class ClientConnectionsEntry {
}
public void releaseSubscribeConnection(RedisPubSubConnection connection) {
if (client != connection.getRedisClient()) {
connection.closeAsync();
return;
}
connection.setLastUsageTime(System.currentTimeMillis());
freeSubscribeConnections.add(connection);
}
@ -227,17 +237,11 @@ public class ClientConnectionsEntry {
freeSubscribeConnectionsCounter.release();
}
public boolean freezeMaster(FreezeReason reason) {
public void freezeMaster(FreezeReason reason) {
synchronized (this) {
setFreezed(true);
// only RECONNECT freeze reason could be replaced
if (getFreezeReason() == null
|| getFreezeReason() == FreezeReason.RECONNECT) {
setFreezeReason(reason);
return true;
}
setFreezeReason(reason);
}
return false;
}
@Override

@ -528,10 +528,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected final void changeMaster(int slot, URI address) {
MasterSlaveEntry entry = getEntry(slot);
final MasterSlaveEntry entry = getEntry(slot);
client2entry.remove(entry.getClient());
entry.changeMaster(address);
client2entry.put(entry.getClient(), entry);
entry.changeMaster(address).addListener(new FutureListener<RedisClient>() {
@Override
public void operationComplete(Future<RedisClient> future) throws Exception {
if (future.isSuccess()) {
client2entry.put(entry.getClient(), entry);
}
}
});
}
protected final void addEntry(Integer slot, MasterSlaveEntry entry) {

@ -368,8 +368,9 @@ public class MasterSlaveEntry {
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& !addr.equals(entry.getClient().getAddr())) {
slaveDown(addr, FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
if (slaveDown(addr, FreezeReason.SYSTEM)) {
log.info("master {} excluded from slaves", addr);
}
}
return true;
}
@ -387,8 +388,9 @@ public class MasterSlaveEntry {
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& !URIBuilder.compare(addr, address)) {
slaveDown(addr, FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
if (slaveDown(addr, FreezeReason.SYSTEM)) {
log.info("master {} excluded from slaves", addr);
}
}
return true;
}
@ -402,8 +404,9 @@ public class MasterSlaveEntry {
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& !addr.equals(address)) {
slaveDown(addr, FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
if (slaveDown(addr, FreezeReason.SYSTEM)) {
log.info("master {} excluded from slaves", addr);
}
}
return true;
}
@ -415,11 +418,13 @@ public class MasterSlaveEntry {
* Shutdown old master client.
*
* @param address of Redis
* @return
*/
public void changeMaster(URI address) {
public RFuture<RedisClient> changeMaster(URI address) {
final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<RedisClient> future = setupMasterEntry(address);
changeMaster(address, oldMaster, future);
return future;
}
public void changeMaster(InetSocketAddress address, URI uri) {
@ -469,10 +474,6 @@ public class MasterSlaveEntry {
return masterEntry.getFreezeReason();
}
public void freeze() {
masterEntry.freezeMaster(FreezeReason.MANAGER);
}
public void unfreeze() {
masterEntry.resetFirstFail();
synchronized (masterEntry) {

@ -178,13 +178,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> failed = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) {
if (entry.isFailed()) {
failed.add(entry.getClient().getAddr());
} else if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr());
} else {
failedAttempts.add(entry.getClient().getAddr());
}
}
@ -192,8 +192,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed);
}
if (!failedAttempts.isEmpty()) {
errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);
if (!failed.isEmpty()) {
errorMsg.append(" Hosts disconnected due to errors during `failedSlaveCheckInterval`: " + failed);
}
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());

@ -6,10 +6,13 @@ import static org.redisson.BaseTest.createInstance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -28,8 +31,12 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
import org.redisson.api.Node.InfoSection;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnectionException;
@ -39,6 +46,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
import io.netty.buffer.Unpooled;
@ -295,6 +303,127 @@ public class RedissonTest {
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
}
@Test
public void testFailoverInSentinel() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
RedissonClient redisson = Redisson.create(config);
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 1000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
latch.countDown();
};
};
t.start();
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
master = new RedisRunner()
.port(master.getRedisServerPort())
.nosave()
.randomDir()
.run();
System.out.println("master " + master.getRedisServerAddressAndPort() + " started!");
Thread.sleep(15000);
latch.await();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
if (!rFuture.isSuccess()) {
System.out.println("cause " + rFuture.cause());
if (rFuture.cause().getMessage().contains("READONLY You can't write against")) {
readonlyErrors++;
}
errors++;
} else {
success++;
}
}
System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors);
assertThat(errors).isLessThan(600);
assertThat(readonlyErrors).isZero();
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
}
@Test
public void testReconnection() throws IOException, InterruptedException, TimeoutException {
RedisProcess runner = new RedisRunner()

Loading…
Cancel
Save