From c72e6ad943f96df8b6556a1b4360532cac4451cf Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 21 Mar 2018 17:53:18 +0300 Subject: [PATCH] Fixed - "READONLY can't write against a read only slave" error. #1272 #945 --- .../connection/ClientConnectionsEntry.java | 20 +-- .../MasterSlaveConnectionManager.java | 12 +- .../redisson/connection/MasterSlaveEntry.java | 23 ++-- .../connection/pool/ConnectionPool.java | 12 +- .../test/java/org/redisson/RedissonTest.java | 129 ++++++++++++++++++ 5 files changed, 168 insertions(+), 28 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index b622a457d..b826154ea 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -141,6 +141,11 @@ public class ClientConnectionsEntry { } public void releaseConnection(RedisConnection connection) { + if (client != connection.getRedisClient()) { + connection.closeAsync(); + return; + } + connection.setLastUsageTime(System.currentTimeMillis()); freeConnections.add(connection); } @@ -215,6 +220,11 @@ public class ClientConnectionsEntry { } public void releaseSubscribeConnection(RedisPubSubConnection connection) { + if (client != connection.getRedisClient()) { + connection.closeAsync(); + return; + } + connection.setLastUsageTime(System.currentTimeMillis()); freeSubscribeConnections.add(connection); } @@ -227,17 +237,11 @@ public class ClientConnectionsEntry { freeSubscribeConnectionsCounter.release(); } - public boolean freezeMaster(FreezeReason reason) { + public void freezeMaster(FreezeReason reason) { synchronized (this) { setFreezed(true); - // only RECONNECT freeze reason could be replaced - if (getFreezeReason() == null - || getFreezeReason() == FreezeReason.RECONNECT) { - setFreezeReason(reason); - return true; - } + setFreezeReason(reason); } - return false; } @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 86f8bbf58..d2ad9c769 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -528,10 +528,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected final void changeMaster(int slot, URI address) { - MasterSlaveEntry entry = getEntry(slot); + final MasterSlaveEntry entry = getEntry(slot); client2entry.remove(entry.getClient()); - entry.changeMaster(address); - client2entry.put(entry.getClient(), entry); + entry.changeMaster(address).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + client2entry.put(entry.getClient(), entry); + } + } + }); } protected final void addEntry(Integer slot, MasterSlaveEntry entry) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index ba06d3397..cea2c62f2 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -368,8 +368,9 @@ public class MasterSlaveEntry { // exclude master from slaves if (!config.checkSkipSlavesInit() && !addr.equals(entry.getClient().getAddr())) { - slaveDown(addr, FreezeReason.SYSTEM); - log.info("master {} excluded from slaves", addr); + if (slaveDown(addr, FreezeReason.SYSTEM)) { + log.info("master {} excluded from slaves", addr); + } } return true; } @@ -387,8 +388,9 @@ public class MasterSlaveEntry { // exclude master from slaves if (!config.checkSkipSlavesInit() && !URIBuilder.compare(addr, address)) { - slaveDown(addr, FreezeReason.SYSTEM); - log.info("master {} excluded from slaves", addr); + if (slaveDown(addr, FreezeReason.SYSTEM)) { + log.info("master {} excluded from slaves", addr); + } } return true; } @@ -402,8 +404,9 @@ public class MasterSlaveEntry { // exclude master from slaves if (!config.checkSkipSlavesInit() && !addr.equals(address)) { - slaveDown(addr, FreezeReason.SYSTEM); - log.info("master {} excluded from slaves", addr); + if (slaveDown(addr, FreezeReason.SYSTEM)) { + log.info("master {} excluded from slaves", addr); + } } return true; } @@ -415,11 +418,13 @@ public class MasterSlaveEntry { * Shutdown old master client. * * @param address of Redis + * @return */ - public void changeMaster(URI address) { + public RFuture changeMaster(URI address) { final ClientConnectionsEntry oldMaster = masterEntry; RFuture future = setupMasterEntry(address); changeMaster(address, oldMaster, future); + return future; } public void changeMaster(InetSocketAddress address, URI uri) { @@ -469,10 +474,6 @@ public class MasterSlaveEntry { return masterEntry.getFreezeReason(); } - public void freeze() { - masterEntry.freezeMaster(FreezeReason.MANAGER); - } - public void unfreeze() { masterEntry.resetFirstFail(); synchronized (masterEntry) { diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 3735e4fd5..4966f760c 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -178,13 +178,13 @@ abstract class ConnectionPool { } } - List failedAttempts = new LinkedList(); + List failed = new LinkedList(); List freezed = new LinkedList(); for (ClientConnectionsEntry entry : entries) { - if (entry.isFreezed()) { + if (entry.isFailed()) { + failed.add(entry.getClient().getAddr()); + } else if (entry.isFreezed()) { freezed.add(entry.getClient().getAddr()); - } else { - failedAttempts.add(entry.getClient().getAddr()); } } @@ -192,8 +192,8 @@ abstract class ConnectionPool { if (!freezed.isEmpty()) { errorMsg.append(" Disconnected hosts: " + freezed); } - if (!failedAttempts.isEmpty()) { - errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts); + if (!failed.isEmpty()) { + errorMsg.append(" Hosts disconnected due to errors during `failedSlaveCheckInterval`: " + failed); } RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 0f6507289..ef8a2ebbe 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -6,10 +6,13 @@ import static org.redisson.BaseTest.createInstance; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -28,8 +31,12 @@ import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.ClusterNode; import org.redisson.api.Node; import org.redisson.api.Node.InfoSection; +import org.redisson.api.listener.MessageListener; +import org.redisson.api.listener.StatusListener; import org.redisson.api.NodesGroup; +import org.redisson.api.RFuture; import org.redisson.api.RMap; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnectionException; @@ -39,6 +46,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.codec.SerializationCodec; import org.redisson.config.Config; import org.redisson.connection.ConnectionListener; +import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.HashValue; import io.netty.buffer.Unpooled; @@ -295,6 +303,127 @@ public class RedissonTest { await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); } + @Test + public void testFailoverInSentinel() throws Exception { + RedisRunner.RedisProcess master = new RedisRunner() + .nosave() + .randomDir() + .run(); + RedisRunner.RedisProcess slave1 = new RedisRunner() + .port(6380) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess slave2 = new RedisRunner() + .port(6381) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess sentinel1 = new RedisRunner() + .nosave() + .randomDir() + .port(26379) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel2 = new RedisRunner() + .nosave() + .randomDir() + .port(26380) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel3 = new RedisRunner() + .nosave() + .randomDir() + .port(26381) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useSentinelServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + RedissonClient redisson = Redisson.create(config); + + List> futures = new ArrayList>(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 1000; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + RFuture f2 = redisson.getBucket("i" + i).setAsync(""); + RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + futures.add(f1); + futures.add(f2); + futures.add(f3); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + latch.countDown(); + }; + }; + t.start(); + + master.stop(); + System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!"); + + Thread.sleep(TimeUnit.SECONDS.toMillis(70)); + + master = new RedisRunner() + .port(master.getRedisServerPort()) + .nosave() + .randomDir() + .run(); + + System.out.println("master " + master.getRedisServerAddressAndPort() + " started!"); + + + Thread.sleep(15000); + + latch.await(); + + int errors = 0; + int success = 0; + int readonlyErrors = 0; + + for (RFuture rFuture : futures) { + rFuture.awaitUninterruptibly(); + if (!rFuture.isSuccess()) { + System.out.println("cause " + rFuture.cause()); + if (rFuture.cause().getMessage().contains("READONLY You can't write against")) { + readonlyErrors++; + } + errors++; + } else { + success++; + } + } + + System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors); + + assertThat(errors).isLessThan(600); + assertThat(readonlyErrors).isZero(); + + redisson.shutdown(); + sentinel1.stop(); + sentinel2.stop(); + sentinel3.stop(); + master.stop(); + slave1.stop(); + slave2.stop(); + } + + @Test public void testReconnection() throws IOException, InterruptedException, TimeoutException { RedisProcess runner = new RedisRunner()