refactoring

pull/5457/head
Nikita Koksharov 1 year ago
parent 2c1ab3df94
commit 4c44e1b588

@ -824,211 +824,97 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testResubscriptionAfterFailover() throws Exception {
// withSentinel((nodes, redisson) -> {
// Config config = redisson.getConfig();
// config.useSentinelServers()
// .setSubscriptionsPerConnection(20)
// .setSubscriptionConnectionPoolSize(200);
//
// RedissonClient redissonClient = Redisson.create(config);
//
// ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5);
//
// AtomicBoolean exceptionDetected = new AtomicBoolean(false);
//
// Deque<String> status = new ConcurrentLinkedDeque<>();
// Runnable rLockPayload =
// () -> {
// try {
// Integer randomLock = ThreadLocalRandom.current().nextInt(100);
// RLock lock = redissonClient.getLock(randomLock.toString());
// lock.lock(10, TimeUnit.SECONDS);
// lock.unlock();
// status.add("ok");
// } catch (Exception e) {
// status.add("failed");
// if (e.getCause().getMessage().contains("slaves were synced")) {
// return;
// }
// e.printStackTrace();
// exceptionDetected.set(true);
// }
// };
//
// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
//
// try {
// Thread.sleep(Duration.ofSeconds(10).toMillis());
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// RedisClientConfig masterConfig = new RedisClientConfig().setAddress("redis://127.0.0.1:" + nodes.get(0).getFirstMappedPort());
//
// //Failover from master to slave
// try {
// RedisClient.create(masterConfig).connect().sync(RedisCommands.SHUTDOWN);
// } catch (RedisTimeoutException e) {
// // node goes down, so this command times out waiting for the response
// }
//
// GenericContainer<?> slave =
// new GenericContainer<>("bitnami/redis")
//// .withNetwork(nodes.get(1).getNetwork())
// .withEnv("REDIS_REPLICATION_MODE", "slave")
// .withEnv("REDIS_MASTER_HOST", "redis")
// .withEnv("ALLOW_EMPTY_PASSWORD", "yes")
// .withNetworkAliases("slave2")
// .withExposedPorts(6379);
// nodes.add(2, slave);
// slave.setPortBindings(Arrays.asList("6312:6379"));
// slave.start();
//
// System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client ");
// try {
// Thread.sleep(Duration.ofSeconds(10).toMillis());
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// assertThat(exceptionDetected.get()).isFalse();
// assertThat(status.peekLast()).isEqualTo("ok");
//
// executor1.shutdown();
//
// redissonClient.shutdown();
//
// }, 1);
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.port(6400)
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6400)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
Thread.sleep(1000);
Config config = new Config();
config.useSentinelServers()
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster")
.setSubscriptionsPerConnection(20)
.setSubscriptionConnectionPoolSize(200);
RedissonClient redisson = Redisson.create(config);
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5);
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
withSentinel((nodes, config) -> {
config.useSentinelServers()
.setSubscriptionsPerConnection(20)
.setSubscriptionConnectionPoolSize(200);
RedissonClient redissonClient = Redisson.create(config);
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5);
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
Deque<String> status = new ConcurrentLinkedDeque<>();
Runnable rLockPayload =
() -> {
try {
Integer randomLock = ThreadLocalRandom.current().nextInt(100);
RLock lock = redissonClient.getLock(randomLock.toString());
lock.lock(10, TimeUnit.SECONDS);
lock.unlock();
status.add("ok");
} catch (Exception e) {
if (e.getMessage().contains("READONLY")) {
// skip
return;
}
Deque<String> status = new ConcurrentLinkedDeque<>();
Runnable rLockPayload =
() -> {
try {
Integer randomLock = ThreadLocalRandom.current().nextInt(100);
RLock lock = redisson.getLock(randomLock.toString());
lock.lock(10, TimeUnit.SECONDS);
lock.unlock();
status.add("ok");
} catch (Exception e) {
status.add("failed");
if (e.getCause().getMessage().contains("slaves were synced")) {
return;
status.add("failed");
if (e.getCause() != null
&& e.getCause().getMessage().contains("slaves were synced")) {
return;
}
e.printStackTrace();
exceptionDetected.set(true);
}
e.printStackTrace();
exceptionDetected.set(true);
}
};
};
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
Thread.sleep(java.time.Duration.ofSeconds(10).toMillis());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RedisClientConfig masterConfig = new RedisClientConfig().setAddress(master.getRedisServerAddressAndPort());
nodes.get(0).stop();
//Failover from master to slave
try {
RedisClient.create(masterConfig).connect().sync(new RedisStrictCommand<Void>("DEBUG", "SEGFAULT"));
} catch (RedisTimeoutException e) {
// node goes down, so this command times out waiting for the response
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//Re-introduce master as slave like kubernetes would do
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6380)
.run();
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:6.2.13")
.withNetwork(nodes.get(1).getNetwork())
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "slave0")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave2")
.withExposedPorts(6379);
nodes.add(slave);
slave.start();
System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client ");
Thread.sleep(java.time.Duration.ofSeconds(10).toMillis());
System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client ");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(exceptionDetected.get()).isFalse();
assertThat(status.peekLast()).isEqualTo("ok");
assertThat(exceptionDetected.get()).isFalse();
assertThat(status.peekLast()).isEqualTo("ok");
executor1.shutdown();
executor1.shutdown();
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
redissonClient.shutdown();
}, 1);
}
private void withSentinel(BiConsumer<List<GenericContainer<?>>, RedissonClient> callback, int slaves) throws InterruptedException {
private void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis")
new GenericContainer<>("bitnami/redis:6.2.13")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
@ -1046,7 +932,7 @@ public class RedissonTopicTest extends RedisDockerTest {
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis")
new GenericContainer<>("bitnami/redis:6.2.13")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
@ -1064,8 +950,10 @@ public class RedissonTopicTest extends RedisDockerTest {
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel")
new GenericContainer<>("bitnami/redis-sentinel:6.2.13")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
@ -1078,8 +966,10 @@ public class RedissonTopicTest extends RedisDockerTest {
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel")
new GenericContainer<>("bitnami/redis-sentinel:6.2.13")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
@ -1092,8 +982,10 @@ public class RedissonTopicTest extends RedisDockerTest {
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel")
new GenericContainer<>("bitnami/redis-sentinel:6.2.13")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
@ -1129,7 +1021,6 @@ public class RedissonTopicTest extends RedisDockerTest {
if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
// System.out.println("attempt " + uri + " - " + node.getNetworkAliases() + " " + node.isHostAccessible() +" " + node.isRunning());
}
if ("redis".equals(uri.getHost())
@ -1148,11 +1039,8 @@ public class RedissonTopicTest extends RedisDockerTest {
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
RedissonClient redisson = Redisson.create(config);
callback.accept(nodes, redisson);
callback.accept(nodes, config);
redisson.shutdown();
nodes.forEach(n -> n.stop());
network.close();
}
@ -1160,10 +1048,11 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInSentinel() throws Exception {
withSentinel((nodes, redisson) -> {
withSentinel((nodes, config) -> {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RedissonClient redisson = Redisson.create(config);
RTopic topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@ -1206,6 +1095,7 @@ public class RedissonTopicTest extends RedisDockerTest {
await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
assertThat(executed.get()).isTrue();
redisson.shutdown();
}, 2);
}
@ -1493,117 +1383,83 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInSentinel3() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.port(6400)
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6400)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6400, 2)
.sentinelDownAfterMilliseconds("myMaster", 750)
.sentinelFailoverTimeout("myMaster", 1250)
.run();
Thread.sleep(1000);
Config config = new Config();
config.useSentinelServers()
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster")
.setSubscriptionsPerConnection(20)
.setSubscriptionConnectionPoolSize(200);
RedissonClient redisson = Redisson.create(config);
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5);
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
Deque<String> status = new ConcurrentLinkedDeque<>();
Runnable rLockPayload =
() -> {
try {
Integer randomLock = ThreadLocalRandom.current().nextInt(100);
RLock lock = redisson.getLock(randomLock.toString());
lock.lock(10, TimeUnit.SECONDS);
lock.unlock();
RTopic t = redisson.getTopic("topic_" + randomLock);
int s = t.addListener(new StatusListener() {
@Override
public void onSubscribe(String channel) {
withSentinel((nodes, config) -> {
config.useSentinelServers()
.setSubscriptionsPerConnection(20)
.setSubscriptionConnectionPoolSize(200);
RedissonClient redissonClient = Redisson.create(config);
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5);
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
Deque<String> status = new ConcurrentLinkedDeque<>();
Runnable rLockPayload =
() -> {
try {
Integer randomLock = ThreadLocalRandom.current().nextInt(100);
RLock lock = redissonClient.getLock(randomLock.toString());
lock.lock(10, TimeUnit.SECONDS);
lock.unlock();
RTopic t = redissonClient.getTopic("topic_" + randomLock);
int s = t.addListener(new StatusListener() {
@Override
public void onSubscribe(String channel) {
}
@Override
public void onUnsubscribe(String channel) {
}
});
t.removeListener(s);
status.add("ok");
} catch (Exception e) {
if (e.getMessage().contains("READONLY")) {
// skip
return;
}
@Override
public void onUnsubscribe(String channel) {
status.add("failed");
if (e.getCause() != null
&& e.getCause().getMessage().contains("slaves were synced")) {
return;
}
});
t.removeListener(s);
status.add("ok");
} catch (Exception e) {
status.add("failed");
if (e.getCause().getMessage().contains("slaves were synced")) {
return;
e.printStackTrace();
exceptionDetected.set(true);
}
e.printStackTrace();
exceptionDetected.set(true);
}
};
};
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS);
Thread.sleep(java.time.Duration.ofSeconds(10).toMillis());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
master.stop();
nodes.get(0).stop();
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(exceptionDetected.get()).isFalse();
assertThat(status.peekLast()).isEqualTo("ok");
assertThat(exceptionDetected.get()).isFalse();
assertThat(status.peekLast()).isEqualTo("ok");
executor1.shutdown();
executor1.shutdown();
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
redissonClient.shutdown();
}, 1);
}
@Test

Loading…
Cancel
Save