Feature - Add RedissonClient.reactive() and RedissonClient.rxJava() methods #3495

pull/3505/head
Nikita Koksharov 4 years ago
parent fc3d59ffa2
commit 0cf20f92e8

@ -108,49 +108,52 @@ public class Redisson implements RedissonClient {
return new Redisson(config);
}
/**
* Create Reactive Redisson instance with default config
*
* @return Redisson instance
/*
* Use Redisson.create().rxJava() method instead
*/
@Deprecated
public static RedissonRxClient createRx() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return createRx(config);
}
/**
* Create Reactive Redisson instance with provided config
*
* @param config for Redisson
* @return Redisson instance
/*
* Use Redisson.create(config).rxJava() method instead
*/
@Deprecated
public static RedissonRxClient createRx(Config config) {
return new RedissonRx(config);
}
@Override
public RedissonRxClient rxJava() {
return new RedissonRx(connectionManager, evictionScheduler, writeBehindService, responses);
}
/**
* Create Reactive Redisson instance with default config
*
* @return Redisson instance
/*
* Use Redisson.create().reactive() method instead
*/
@Deprecated
public static RedissonReactiveClient createReactive() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return createReactive(config);
}
/**
* Create Reactive Redisson instance with provided config
*
* @param config for Redisson
* @return Redisson instance
/*
* Use Redisson.create(config).reactive() method instead
*/
@Deprecated
public static RedissonReactiveClient createReactive(Config config) {
return new RedissonReactive(config);
}
@Override
public RedissonReactiveClient reactive() {
return new RedissonReactive(connectionManager, evictionScheduler, writeBehindService, responses);
}
@Override
public <V> RTimeSeries<V> getTimeSeries(String name) {
return new RedissonTimeSeries<>(evictionScheduler, commandExecutor, name);

@ -15,11 +15,6 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
@ -30,6 +25,11 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.reactive.*;
import org.redisson.remote.ResponseEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Main infrastructure class allows to get access
* to all Redisson objects on top of Redis server.
@ -43,8 +43,7 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final EvictionScheduler evictionScheduler;
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected final ConcurrentMap<String, ResponseEntry> responses;
protected RedissonReactive(Config config) {
Config configCopy = new Config(config);
@ -57,8 +56,22 @@ public class RedissonReactive implements RedissonReactiveClient {
commandExecutor = new CommandReactiveService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
responses = new ConcurrentHashMap<>();
}
protected RedissonReactive(ConnectionManager connectionManager, EvictionScheduler evictionScheduler,
WriteBehindService writeBehindService, ConcurrentMap<String, ResponseEntry> responses) {
this.connectionManager = connectionManager;
RedissonObjectBuilder objectBuilder = null;
if (connectionManager.getCfg().isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandReactiveService(connectionManager, objectBuilder);
this.evictionScheduler = evictionScheduler;
this.writeBehindService = writeBehindService;
this.responses = responses;
}
public EvictionScheduler getEvictionScheduler() {
return evictionScheduler;
}

@ -41,8 +41,7 @@ public class RedissonRx implements RedissonRxClient {
protected final EvictionScheduler evictionScheduler;
protected final CommandRxExecutor commandExecutor;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected final ConcurrentMap<String, ResponseEntry> responses;
protected RedissonRx(Config config) {
Config configCopy = new Config(config);
@ -55,8 +54,22 @@ public class RedissonRx implements RedissonRxClient {
commandExecutor = new CommandRxService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
responses = new ConcurrentHashMap<>();
}
protected RedissonRx(ConnectionManager connectionManager, EvictionScheduler evictionScheduler,
WriteBehindService writeBehindService, ConcurrentMap<String, ResponseEntry> responses) {
this.connectionManager = connectionManager;
RedissonObjectBuilder objectBuilder = null;
if (connectionManager.getCfg().isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandRxService(connectionManager, objectBuilder);
this.evictionScheduler = evictionScheduler;
this.writeBehindService = writeBehindService;
this.responses = responses;
}
@Override
public <K, V> RStreamRx<K, V> getStream(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonStream<K, V>(commandExecutor, name), RStreamRx.class);

@ -1136,7 +1136,21 @@ public interface RedissonClient {
* @return LiveObjectService object
*/
RLiveObjectService getLiveObjectService();
/**
* Returns RxJava Redisson instance
*
* @return redisson instance
*/
RedissonRxClient rxJava();
/**
* Returns Reactive Redisson instance
*
* @return redisson instance
*/
RedissonReactiveClient reactive();
/**
* Shutdown Redisson instance but <b>NOT</b> Redis server
*

@ -61,7 +61,7 @@ public abstract class BaseReactiveTest {
public static RedissonReactiveClient createInstance() {
Config config = BaseTest.createConfig();
return Redisson.createReactive(config);
return Redisson.create(config).reactive();
}
}

@ -100,18 +100,14 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonReactiveClient reactive = Redisson.createReactive(config);
RedissonReactiveClient reactive = Redisson.create(config).reactive();
RBucketReactive<Object> b1 = reactive.getBucket("b1");
sync(b1.set(new MyObject()));
RSetReactive<Object> s1 = reactive.getSet("s1");
assertTrue(sync(s1.add(b1)));
assertTrue(codec == b1.getCodec());
Config config1 = new Config();
config1.setCodec(codec);
config1.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonReactiveClient reactive1 = Redisson.createReactive(config1);
RedissonReactiveClient reactive1 = Redisson.create(config).reactive();
RSetReactive<RBucketReactive> s2 = reactive1.getSet("s1");
RBucketReactive<MyObject> b2 = sync(s2.iterator(1));

@ -96,7 +96,7 @@ public class RedissonReferenceTest extends BaseTest {
b3.setAsync(b1);
batch.execute();
RBatchReactive b = Redisson.createReactive(redisson.getConfig()).createBatch();
RBatchReactive b = redisson.reactive().createBatch();
b.getBucket("b1").get();
b.getBucket("b2").get();
b.getBucket("b3").get();

@ -391,13 +391,13 @@ public class RedissonRemoteServiceTest extends BaseTest {
@Test
public void testCancelReactive() throws InterruptedException {
RedissonReactiveClient r1 = Redisson.createReactive(createConfig());
RedissonReactiveClient r1 = Redisson.create(createConfig()).reactive();
AtomicInteger iterations = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
r1.getKeys().flushall();
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl(iterations), 1, executor);
RedissonReactiveClient r2 = Redisson.createReactive(createConfig());
RedissonReactiveClient r2 = Redisson.create(createConfig()).reactive();
RemoteInterfaceReactive ri = r2.getRemoteService().get(RemoteInterfaceReactive.class);
Mono<Void> f = ri.cancelMethod();
@ -444,10 +444,10 @@ public class RedissonRemoteServiceTest extends BaseTest {
@Test
public void testReactive() throws InterruptedException {
RedissonReactiveClient r1 = Redisson.createReactive(createConfig());
RedissonReactiveClient r1 = Redisson.create(createConfig()).reactive();
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonReactiveClient r2 = Redisson.createReactive(createConfig());
RedissonReactiveClient r2 = Redisson.create(createConfig()).reactive();
RemoteInterfaceReactive ri = r2.getRemoteService().get(RemoteInterfaceReactive.class);
Mono<Void> f = ri.voidMethod("someName", 100L);
@ -460,11 +460,11 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
@Test
public void testRx() throws InterruptedException {
RedissonRxClient r1 = Redisson.createRx(createConfig());
public void testRx() {
RedissonRxClient r1 = Redisson.create(createConfig()).rxJava();
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonRxClient r2 = Redisson.createRx(createConfig());
RedissonRxClient r2 = Redisson.create(createConfig()).rxJava();
RemoteInterfaceRx ri = r2.getRemoteService().get(RemoteInterfaceRx.class);
Completable f = ri.voidMethod("someName", 100L);

@ -70,7 +70,7 @@ public abstract class BaseRxTest {
public static RedissonRxClient createInstance() {
Config config = BaseTest.createConfig();
return Redisson.createRx(config);
return Redisson.create(config).rxJava();
}
}

@ -136,7 +136,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
.setTimeout(200)
.setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonRxClient redisson = Redisson.createRx(config);
RedissonRxClient redisson = Redisson.create(config).rxJava();
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatchRx batch = redisson.createBatch(batchOptions);
@ -204,7 +204,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
.setTimeout(1000000)
.setRetryInterval(1000000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonRxClient redisson = Redisson.createRx(config);
RedissonRxClient redisson = Redisson.create(config).rxJava();
batchOptions
.syncSlaves(1, 1, TimeUnit.SECONDS);
@ -227,7 +227,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
public void testWriteTimeout() throws InterruptedException {
Config config = BaseTest.createConfig();
config.useSingleServer().setRetryInterval(700).setTimeout(1500);
RedissonRxClient redisson = Redisson.createRx(config);
RedissonRxClient redisson = Redisson.create(config).rxJava();
RBatchRx batch = redisson.createBatch(batchOptions);
RMapCacheRx<String, String> map = batch.getMapCache("test");
@ -322,7 +322,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
config.useClusterServers()
.setTimeout(123000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonRxClient redisson = Redisson.createRx(config);
RedissonRxClient redisson = Redisson.create(config).rxJava();
batchOptions
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC)
@ -386,7 +386,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
public void testBatchBigRequest() {
Config config = BaseTest.createConfig();
config.useSingleServer().setTimeout(15000);
RedissonRxClient redisson = Redisson.createRx(config);
RedissonRxClient redisson = Redisson.create(config).rxJava();
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 210; i++) {

@ -35,7 +35,7 @@ public class RedissonReactiveTransactionContextConfig {
@Bean
public RedissonReactiveClient redisson() {
return Redisson.createReactive(BaseTest.createConfig());
return Redisson.create(BaseTest.createConfig()).reactive();
}
@PreDestroy

Loading…
Cancel
Save