refactoring

pull/5457/head
Nikita Koksharov 1 year ago
parent f821ff0146
commit 863025f9b3

@ -12,7 +12,7 @@ import org.redisson.config.Config;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
public abstract class BaseConcurrentTest extends BaseTest { public abstract class BaseConcurrentTest extends RedisDockerTest {
protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
@ -59,39 +59,9 @@ public abstract class BaseConcurrentTest extends BaseTest {
Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
} }
protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<Integer, RedissonClient>();
for (int i = 0; i < iterations; i++) {
instances.put(i, BaseTest.createInstance());
}
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final int n = i;
executor.execute(() -> runnable.run(instances.get(n)));
}
executor.shutdown();
Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
System.out.println("multi: " + (System.currentTimeMillis() - watch));
executor = Executors.newCachedThreadPool();
for (final RedissonClient redisson : instances.values()) {
executor.execute(() -> redisson.shutdown());
}
executor.shutdown();
Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
}
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Single Instance Concurrent Job Interation: " + iterations); System.out.println("Single Instance Concurrent Job Interation: " + iterations);
final RedissonClient r = BaseTest.createInstance(); final RedissonClient r = createInstance();
long watch = System.currentTimeMillis(); long watch = System.currentTimeMillis();
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

@ -20,12 +20,11 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
public void testAdd_SingleInstance() throws InterruptedException { public void testAdd_SingleInstance() throws InterruptedException {
final String name = "testAdd_SingleInstance"; final String name = "testAdd_SingleInstance";
RedissonClient r = BaseTest.createInstance(); RSortedSet<Integer> map = redisson.getSortedSet(name);
RSortedSet<Integer> map = r.getSortedSet(name);
map.clear(); map.clear();
int length = 5000; int length = 5000;
final List<Integer> elements = new ArrayList<Integer>(); final List<Integer> elements = new ArrayList<>();
for (int i = 1; i < length + 1; i++) { for (int i = 1; i < length + 1; i++) {
elements.add(i); elements.add(i);
} }
@ -43,15 +42,13 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
assertThat(map).containsExactly(p); assertThat(map).containsExactly(p);
map.clear(); map.clear();
r.shutdown();
} }
@Test @Test
public void testAddRemove_SingleInstance() throws InterruptedException, NoSuchAlgorithmException { public void testAddRemove_SingleInstance() throws InterruptedException, NoSuchAlgorithmException {
final String name = "testAddNegative_SingleInstance"; final String name = "testAddNegative_SingleInstance";
RedissonClient r = BaseTest.createInstance(); RSortedSet<Integer> map = redisson.getSortedSet(name);
RSortedSet<Integer> map = r.getSortedSet(name);
map.clear(); map.clear();
int length = 1000; int length = 1000;
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
@ -79,8 +76,6 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
Assertions.fail(); Assertions.fail();
} }
} }
r.shutdown();
} }
} }

@ -7,9 +7,8 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.*; import org.redisson.client.*;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.testcontainers.containers.GenericContainer;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -81,19 +80,13 @@ public class RedissonLockTest extends BaseConcurrentTest {
} }
@Test @Test
public void testSubscriptionsPerConnection() throws InterruptedException, IOException { public void testSubscriptionsPerConnection() throws InterruptedException {
RedisRunner.RedisProcess runner = new RedisRunner()
.port(RedisRunner.findFreePort())
.nosave()
.randomDir()
.run();
Config config = new Config(); Config config = new Config();
config.useSingleServer() config.useSingleServer()
.setSubscriptionConnectionPoolSize(1) .setSubscriptionConnectionPoolSize(1)
.setSubscriptionConnectionMinimumIdleSize(1) .setSubscriptionConnectionMinimumIdleSize(1)
.setSubscriptionsPerConnection(1) .setSubscriptionsPerConnection(1)
.setAddress(runner.getRedisServerAddressAndPort()); .setAddress(redisson.getConfig().useSingleServer().getAddress());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
ExecutorService e = Executors.newFixedThreadPool(32); ExecutorService e = Executors.newFixedThreadPool(32);
@ -124,7 +117,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
assertThat(errors.get()).isZero(); assertThat(errors.get()).isZero();
RedisClientConfig cc = new RedisClientConfig(); RedisClientConfig cc = new RedisClientConfig();
cc.setAddress(runner.getRedisServerAddressAndPort()); cc.setAddress(redisson.getConfig().useSingleServer().getAddress());
RedisClient c = RedisClient.create(cc); RedisClient c = RedisClient.create(cc);
RedisConnection ccc = c.connect(); RedisConnection ccc = c.connect();
List<String> channels = ccc.sync(RedisCommands.PUBSUB_CHANNELS); List<String> channels = ccc.sync(RedisCommands.PUBSUB_CHANNELS);
@ -132,20 +125,13 @@ public class RedissonLockTest extends BaseConcurrentTest {
c.shutdown(); c.shutdown();
redisson.shutdown(); redisson.shutdown();
runner.stop();
} }
@Test @Test
public void testSinglePubSub() throws IOException, InterruptedException, ExecutionException { public void testSinglePubSub() throws InterruptedException, ExecutionException {
RedisRunner.RedisProcess runner = new RedisRunner()
.port(RedisRunner.findFreePort())
.nosave()
.randomDir()
.run();
Config config = new Config(); Config config = new Config();
config.useSingleServer() config.useSingleServer()
.setAddress(runner.getRedisServerAddressAndPort()) .setAddress(redisson.getConfig().useSingleServer().getAddress())
.setSubscriptionConnectionPoolSize(1) .setSubscriptionConnectionPoolSize(1)
.setSubscriptionsPerConnection(1); .setSubscriptionsPerConnection(1);
ExecutorService executorService = Executors.newFixedThreadPool(4); ExecutorService executorService = Executors.newFixedThreadPool(4);
@ -165,7 +151,6 @@ public class RedissonLockTest extends BaseConcurrentTest {
assertThat(hasFails).isFalse(); assertThat(hasFails).isFalse();
redissonClient.shutdown(); redissonClient.shutdown();
runner.stop();
} }
@Test @Test
@ -193,23 +178,25 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Test @Test
public void testRedisFailed() { public void testRedisFailed() {
Assertions.assertThrows(WriteRedisConnectionException.class, () -> { GenericContainer<?> redis =
RedisRunner.RedisProcess master = new RedisRunner() new GenericContainer<>("redis:7.2")
.port(6377) .withExposedPorts(6379);
.nosave() redis.start();
.randomDir()
.run();
Config config = new Config(); Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6377"); config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
Assertions.assertThrows(WriteRedisConnectionException.class, () -> {
RLock lock = redisson.getLock("myLock"); RLock lock = redisson.getLock("myLock");
// kill RedisServer while main thread is sleeping. // kill RedisServer while main thread is sleeping.
master.stop(); redis.stop();
Thread.sleep(3000); Thread.sleep(3000);
lock.tryLock(5, 10, TimeUnit.SECONDS); lock.tryLock(5, 10, TimeUnit.SECONDS);
}); });
redisson.shutdown();
} }
@Test @Test
@ -223,7 +210,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
lock.tryLock(3, TimeUnit.SECONDS); lock.tryLock(3, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3100L); assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3350L);
} }
@Test @Test
@ -289,35 +276,13 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Test @Test
public void testInCluster() throws Exception { public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); testInCluster(redisson -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); RLock lock = redisson.getLock("myLock");
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); lock.lock();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); assertThat(lock.isLocked()).isTrue();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); lock.unlock();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); assertThat(lock.isLocked()).isFalse();
});
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");
lock.lock();
assertThat(lock.isLocked()).isTrue();
lock.unlock();
assertThat(lock.isLocked()).isFalse();
redisson.shutdown();
process.shutdown();
} }

@ -23,43 +23,35 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
@Test @Test
public void testGetInClusterNameMapper() throws RedisRunner.FailedToStartRedisException, IOException, InterruptedException { public void testGetInClusterNameMapper() throws RedisRunner.FailedToStartRedisException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); testInCluster(client -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); Config config = client.getConfig();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); config.useClusterServers()
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); .setNameMapper(new NameMapper() {
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); @Override
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); public String map(String name) {
return "test::" + name;
ClusterRunner clusterRunner = new ClusterRunner() }
.addNode(master1, slave1)
.addNode(master2, slave2) @Override
.addNode(master3, slave3); public String unmap(String name) {
ClusterRunner.ClusterProcesses process = clusterRunner.run(); return name.replace("test::", "");
}
Config config = new Config(); });
config.useClusterServers() RedissonClient redisson = Redisson.create(config);
.setNameMapper(new NameMapper() {
@Override RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("semaphore");
public String map(String name) { s.trySetPermits(1);
return "test::" + name; try {
} String v = s.acquire();
s.release(v);
@Override } catch (InterruptedException e) {
public String unmap(String name) { throw new RuntimeException(e);
return name.replace("test::", ""); }
}
})
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("semaphore"); redisson.shutdown();
s.trySetPermits(1);
String v = s.acquire();
s.release(v);
redisson.shutdown(); });
process.shutdown();
} }
@Test @Test

@ -50,7 +50,7 @@ public class RedissonRateLimiterTest extends RedisDockerTest {
Thread.sleep(1000); Thread.sleep(1000);
} }
assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThan(16); assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThanOrEqualTo(16);
e.shutdownNow(); e.shutdownNow();
} }

@ -161,30 +161,14 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
} }
@Test @Test
public void testInCluster() throws Exception { public void testInCluster() {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); testInCluster(redisson -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); RReadWriteLock s = redisson.getReadWriteLock("1234");
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); s.writeLock().lock();
s.readLock().lock();
ClusterRunner clusterRunner = new ClusterRunner() s.readLock().unlock();
.addNode(master1) s.writeLock().unlock();
.addNode(master2) });
.addNode(master3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RReadWriteLock s = redisson.getReadWriteLock("1234");
s.writeLock().lock();
s.readLock().lock();
s.readLock().unlock();
s.writeLock().unlock();
redisson.shutdown();
process.shutdown();
} }
@Test @Test

@ -9,6 +9,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
@ -56,24 +57,26 @@ public class RedissonSpinLockTest extends BaseConcurrentTest {
} }
@Test @Test
public void testRedisFailed() throws IOException, InterruptedException { public void testRedisFailed() {
RedisRunner.RedisProcess master = new RedisRunner() GenericContainer<?> redis =
.port(6377) new GenericContainer<>("redis:7.2")
.nosave() .withExposedPorts(6379);
.randomDir() redis.start();
.run();
Config config = new Config(); Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6377"); config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
Assertions.assertThrows(WriteRedisConnectionException.class, () -> { Assertions.assertThrows(WriteRedisConnectionException.class, () -> {
RLock lock = redisson.getSpinLock("myLock"); RLock lock = redisson.getSpinLock("myLock");
// kill RedisServer while main thread is sleeping. // kill RedisServer while main thread is sleeping.
master.stop(); redis.stop();
Thread.sleep(3000); Thread.sleep(3000);
lock.tryLock(5, 10, TimeUnit.SECONDS); lock.tryLock(5, 10, TimeUnit.SECONDS);
}); });
redisson.shutdown();
} }
@Test @Test
@ -144,38 +147,20 @@ public class RedissonSpinLockTest extends BaseConcurrentTest {
@Test @Test
public void testInCluster() throws Exception { public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); testInCluster(client -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); Config config = client.getConfig();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); config.setSlavesSyncTimeout(3000);
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config(); RedissonClient redisson = Redisson.create(config);
config.useClusterServers() RLock lock = redisson.getSpinLock("myLock");
.setLoadBalancer(new RandomLoadBalancer()) lock.lock();
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); assertThat(lock.isLocked()).isTrue();
RedissonClient redisson = Redisson.create(config); lock.unlock();
assertThat(lock.isLocked()).isFalse();
RLock lock = redisson.getSpinLock("myLock"); redisson.shutdown();
lock.lock(); });
assertThat(lock.isLocked()).isTrue();
lock.unlock();
assertThat(lock.isLocked()).isFalse();
redisson.shutdown();
process.shutdown();
} }
@Test @Test
public void testAutoExpire() throws InterruptedException { public void testAutoExpire() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);

Loading…
Cancel
Save