From 4c44e1b588f09cbf4d08791d5b24977a32b8917e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 21 Nov 2023 14:20:15 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonTopicTest.java | 450 ++++++------------ 1 file changed, 153 insertions(+), 297 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index bf74056ff..88904258e 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -824,211 +824,97 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testResubscriptionAfterFailover() throws Exception { -// withSentinel((nodes, redisson) -> { -// Config config = redisson.getConfig(); -// config.useSentinelServers() -// .setSubscriptionsPerConnection(20) -// .setSubscriptionConnectionPoolSize(200); -// -// RedissonClient redissonClient = Redisson.create(config); -// -// ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); -// -// AtomicBoolean exceptionDetected = new AtomicBoolean(false); -// -// Deque status = new ConcurrentLinkedDeque<>(); -// Runnable rLockPayload = -// () -> { -// try { -// Integer randomLock = ThreadLocalRandom.current().nextInt(100); -// RLock lock = redissonClient.getLock(randomLock.toString()); -// lock.lock(10, TimeUnit.SECONDS); -// lock.unlock(); -// status.add("ok"); -// } catch (Exception e) { -// status.add("failed"); -// if (e.getCause().getMessage().contains("slaves were synced")) { -// return; -// } -// e.printStackTrace(); -// exceptionDetected.set(true); -// } -// }; -// -// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); -// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); -// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); -// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); -// executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); -// -// try { -// Thread.sleep(Duration.ofSeconds(10).toMillis()); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// -// RedisClientConfig masterConfig = new RedisClientConfig().setAddress("redis://127.0.0.1:" + nodes.get(0).getFirstMappedPort()); -// -// //Failover from master to slave -// try { -// RedisClient.create(masterConfig).connect().sync(RedisCommands.SHUTDOWN); -// } catch (RedisTimeoutException e) { -// // node goes down, so this command times out waiting for the response -// } -// -// GenericContainer slave = -// new GenericContainer<>("bitnami/redis") -//// .withNetwork(nodes.get(1).getNetwork()) -// .withEnv("REDIS_REPLICATION_MODE", "slave") -// .withEnv("REDIS_MASTER_HOST", "redis") -// .withEnv("ALLOW_EMPTY_PASSWORD", "yes") -// .withNetworkAliases("slave2") -// .withExposedPorts(6379); -// nodes.add(2, slave); -// slave.setPortBindings(Arrays.asList("6312:6379")); -// slave.start(); -// -// System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client "); -// try { -// Thread.sleep(Duration.ofSeconds(10).toMillis()); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// -// assertThat(exceptionDetected.get()).isFalse(); -// assertThat(status.peekLast()).isEqualTo("ok"); -// -// executor1.shutdown(); -// -// redissonClient.shutdown(); -// -// }, 1); - - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .port(6400) - .run(); - - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6400) - .run(); - - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - Thread.sleep(1000); - - Config config = new Config(); - config.useSentinelServers() - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster") - .setSubscriptionsPerConnection(20) - .setSubscriptionConnectionPoolSize(200); - - RedissonClient redisson = Redisson.create(config); - - ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); - - AtomicBoolean exceptionDetected = new AtomicBoolean(false); + withSentinel((nodes, config) -> { + config.useSentinelServers() + .setSubscriptionsPerConnection(20) + .setSubscriptionConnectionPoolSize(200); + + RedissonClient redissonClient = Redisson.create(config); + + ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); + + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + Deque status = new ConcurrentLinkedDeque<>(); + Runnable rLockPayload = + () -> { + try { + Integer randomLock = ThreadLocalRandom.current().nextInt(100); + RLock lock = redissonClient.getLock(randomLock.toString()); + lock.lock(10, TimeUnit.SECONDS); + lock.unlock(); + status.add("ok"); + } catch (Exception e) { + if (e.getMessage().contains("READONLY")) { + // skip + return; + } - Deque status = new ConcurrentLinkedDeque<>(); - Runnable rLockPayload = - () -> { - try { - Integer randomLock = ThreadLocalRandom.current().nextInt(100); - RLock lock = redisson.getLock(randomLock.toString()); - lock.lock(10, TimeUnit.SECONDS); - lock.unlock(); - status.add("ok"); - } catch (Exception e) { - status.add("failed"); - if (e.getCause().getMessage().contains("slaves were synced")) { - return; + status.add("failed"); + if (e.getCause() != null + && e.getCause().getMessage().contains("slaves were synced")) { + return; + } + e.printStackTrace(); + exceptionDetected.set(true); } - e.printStackTrace(); - exceptionDetected.set(true); - } - }; + }; - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - Thread.sleep(java.time.Duration.ofSeconds(10).toMillis()); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - RedisClientConfig masterConfig = new RedisClientConfig().setAddress(master.getRedisServerAddressAndPort()); + nodes.get(0).stop(); - //Failover from master to slave - try { - RedisClient.create(masterConfig).connect().sync(new RedisStrictCommand("DEBUG", "SEGFAULT")); - } catch (RedisTimeoutException e) { - // node goes down, so this command times out waiting for the response - } + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - //Re-introduce master as slave like kubernetes would do - RedisRunner.RedisProcess slave2 = new RedisRunner() - .port(6381) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6380) - .run(); + GenericContainer slave = + new GenericContainer<>("bitnami/redis:6.2.13") + .withNetwork(nodes.get(1).getNetwork()) + .withEnv("REDIS_REPLICATION_MODE", "slave") + .withEnv("REDIS_MASTER_HOST", "slave0") + .withEnv("ALLOW_EMPTY_PASSWORD", "yes") + .withNetworkAliases("slave2") + .withExposedPorts(6379); + nodes.add(slave); + slave.start(); - System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client "); - Thread.sleep(java.time.Duration.ofSeconds(10).toMillis()); + System.out.println("Failover Finished, start to see Subscribe timeouts now. Can't recover this without a refresh of redison client "); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - assertThat(exceptionDetected.get()).isFalse(); - assertThat(status.peekLast()).isEqualTo("ok"); + assertThat(exceptionDetected.get()).isFalse(); + assertThat(status.peekLast()).isEqualTo("ok"); - executor1.shutdown(); + executor1.shutdown(); - redisson.shutdown(); - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); - slave2.stop(); + redissonClient.shutdown(); + + }, 1); } - private void withSentinel(BiConsumer>, RedissonClient> callback, int slaves) throws InterruptedException { + private void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException { Network network = Network.newNetwork(); List>> nodes = new ArrayList<>(); GenericContainer master = - new GenericContainer<>("bitnami/redis") + new GenericContainer<>("bitnami/redis:6.2.13") .withNetwork(network) .withEnv("REDIS_REPLICATION_MODE", "master") .withEnv("ALLOW_EMPTY_PASSWORD", "yes") @@ -1046,7 +932,7 @@ public class RedissonTopicTest extends RedisDockerTest { for (int i = 0; i < slaves; i++) { GenericContainer slave = - new GenericContainer<>("bitnami/redis") + new GenericContainer<>("bitnami/redis:6.2.13") .withNetwork(network) .withEnv("REDIS_REPLICATION_MODE", "slave") .withEnv("REDIS_MASTER_HOST", "redis") @@ -1064,8 +950,10 @@ public class RedissonTopicTest extends RedisDockerTest { } GenericContainer sentinel1 = - new GenericContainer<>("bitnami/redis-sentinel") + new GenericContainer<>("bitnami/redis-sentinel:6.2.13") .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") .withNetworkAliases("sentinel1") .withExposedPorts(26379); sentinel1.start(); @@ -1078,8 +966,10 @@ public class RedissonTopicTest extends RedisDockerTest { nodes.add(sentinel1); GenericContainer sentinel2 = - new GenericContainer<>("bitnami/redis-sentinel") + new GenericContainer<>("bitnami/redis-sentinel:6.2.13") .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") .withNetworkAliases("sentinel2") .withExposedPorts(26379); sentinel2.start(); @@ -1092,8 +982,10 @@ public class RedissonTopicTest extends RedisDockerTest { nodes.add(sentinel2); GenericContainer sentinel3 = - new GenericContainer<>("bitnami/redis-sentinel") + new GenericContainer<>("bitnami/redis-sentinel:6.2.13") .withNetwork(network) + .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000") + .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000") .withNetworkAliases("sentinel3") .withExposedPorts(26379); sentinel3.start(); @@ -1129,7 +1021,6 @@ public class RedissonTopicTest extends RedisDockerTest { if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) { return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec())); -// System.out.println("attempt " + uri + " - " + node.getNetworkAliases() + " " + node.isHostAccessible() +" " + node.isRunning()); } if ("redis".equals(uri.getHost()) @@ -1148,11 +1039,8 @@ public class RedissonTopicTest extends RedisDockerTest { .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort()) .setMasterName("mymaster"); - RedissonClient redisson = Redisson.create(config); - - callback.accept(nodes, redisson); + callback.accept(nodes, config); - redisson.shutdown(); nodes.forEach(n -> n.stop()); network.close(); } @@ -1160,10 +1048,11 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInSentinel() throws Exception { - withSentinel((nodes, redisson) -> { + withSentinel((nodes, config) -> { final AtomicBoolean executed = new AtomicBoolean(); final AtomicInteger subscriptions = new AtomicInteger(); + RedissonClient redisson = Redisson.create(config); RTopic topic = redisson.getTopic("topic"); topic.addListener(new StatusListener() { @@ -1206,6 +1095,7 @@ public class RedissonTopicTest extends RedisDockerTest { await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); assertThat(executed.get()).isTrue(); + redisson.shutdown(); }, 2); } @@ -1493,117 +1383,83 @@ public class RedissonTopicTest extends RedisDockerTest { @Test public void testReattachInSentinel3() throws Exception { - RedisRunner.RedisProcess master = new RedisRunner() - .nosave() - .randomDir() - .port(6400) - .run(); - - RedisRunner.RedisProcess slave1 = new RedisRunner() - .port(6380) - .nosave() - .randomDir() - .slaveof("127.0.0.1", 6400) - .run(); - - RedisRunner.RedisProcess sentinel1 = new RedisRunner() - .nosave() - .randomDir() - .port(26379) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - RedisRunner.RedisProcess sentinel2 = new RedisRunner() - .nosave() - .randomDir() - .port(26380) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - RedisRunner.RedisProcess sentinel3 = new RedisRunner() - .nosave() - .randomDir() - .port(26381) - .sentinel() - .sentinelMonitor("myMaster", "127.0.0.1", 6400, 2) - .sentinelDownAfterMilliseconds("myMaster", 750) - .sentinelFailoverTimeout("myMaster", 1250) - .run(); - - Thread.sleep(1000); - - Config config = new Config(); - config.useSentinelServers() - .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster") - .setSubscriptionsPerConnection(20) - .setSubscriptionConnectionPoolSize(200); - RedissonClient redisson = Redisson.create(config); - - ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); - - AtomicBoolean exceptionDetected = new AtomicBoolean(false); - - Deque status = new ConcurrentLinkedDeque<>(); - Runnable rLockPayload = - () -> { - try { - Integer randomLock = ThreadLocalRandom.current().nextInt(100); - RLock lock = redisson.getLock(randomLock.toString()); - lock.lock(10, TimeUnit.SECONDS); - lock.unlock(); - - RTopic t = redisson.getTopic("topic_" + randomLock); - int s = t.addListener(new StatusListener() { - @Override - public void onSubscribe(String channel) { + withSentinel((nodes, config) -> { + config.useSentinelServers() + .setSubscriptionsPerConnection(20) + .setSubscriptionConnectionPoolSize(200); + + RedissonClient redissonClient = Redisson.create(config); + + ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(5); + + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + Deque status = new ConcurrentLinkedDeque<>(); + Runnable rLockPayload = + () -> { + try { + Integer randomLock = ThreadLocalRandom.current().nextInt(100); + RLock lock = redissonClient.getLock(randomLock.toString()); + lock.lock(10, TimeUnit.SECONDS); + lock.unlock(); + + RTopic t = redissonClient.getTopic("topic_" + randomLock); + int s = t.addListener(new StatusListener() { + @Override + public void onSubscribe(String channel) { + } + + @Override + public void onUnsubscribe(String channel) { + } + }); + t.removeListener(s); + + status.add("ok"); + } catch (Exception e) { + if (e.getMessage().contains("READONLY")) { + // skip + return; } - @Override - public void onUnsubscribe(String channel) { + status.add("failed"); + if (e.getCause() != null + && e.getCause().getMessage().contains("slaves were synced")) { + return; } - }); - t.removeListener(s); - - status.add("ok"); - } catch (Exception e) { - status.add("failed"); - if (e.getCause().getMessage().contains("slaves were synced")) { - return; + e.printStackTrace(); + exceptionDetected.set(true); } - e.printStackTrace(); - exceptionDetected.set(true); - } - }; + }; - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); + executor1.scheduleAtFixedRate(rLockPayload, 100, 50, TimeUnit.MILLISECONDS); - Thread.sleep(java.time.Duration.ofSeconds(10).toMillis()); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - master.stop(); + nodes.get(0).stop(); - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + try { + TimeUnit.SECONDS.sleep(30); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - assertThat(exceptionDetected.get()).isFalse(); - assertThat(status.peekLast()).isEqualTo("ok"); + assertThat(exceptionDetected.get()).isFalse(); + assertThat(status.peekLast()).isEqualTo("ok"); - executor1.shutdown(); + executor1.shutdown(); - redisson.shutdown(); - sentinel1.stop(); - sentinel2.stop(); - sentinel3.stop(); - master.stop(); - slave1.stop(); + redissonClient.shutdown(); + + }, 1); } @Test