refactoring

pull/5732/head
Nikita Koksharov 12 months ago
parent 24b114339e
commit 86fd2d3e6c

@ -22,7 +22,6 @@ import org.redisson.api.*;
import org.redisson.api.options.*;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonCodec;
import org.redisson.command.CommandAsyncService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
@ -92,7 +91,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RStreamReactive<K, V> getStream(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonStream<K, V>(params.getCodec(), ca, params.getName()), RStreamReactive.class);
}
@ -110,7 +109,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RSearchReactive getSearch(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSearch(params.getCodec(), ca), RSearchReactive.class);
}
@ -155,7 +154,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RRateLimiterReactive getRateLimiter(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(ca, params.getName()), RRateLimiterReactive.class);
}
@ -220,7 +219,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLockReactive getLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(ca, params.getName()), RLockReactive.class);
}
@ -244,7 +243,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RFencedLockReactive getFencedLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonFencedLock lock = new RedissonFencedLock(ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, lock, RFencedLockReactive.class);
}
@ -275,7 +274,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RCountDownLatchReactive getCountDownLatch(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ca, params.getName()), RCountDownLatchReactive.class);
}
@ -316,7 +315,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBucketReactive<V> getBucket(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonBucket<V>(params.getCodec(), ca, params.getName()), RBucketReactive.class);
}
@ -334,7 +333,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBucketsReactive getBuckets(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ca), RBucketsReactive.class);
}
@ -360,7 +359,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RJsonBucketReactive<V> getJsonBucket(JsonBucketOptions<V> options) {
JsonBucketParams<V> params = (JsonBucketParams<V>) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonJsonBucket<V>(params.getCodec(), ca, params.getName()), RJsonBucketReactive.class);
}
@ -378,7 +377,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonHyperLogLog<V>(params.getCodec(), ca, params.getName()), RHyperLogLogReactive.class);
}
@ -391,7 +390,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RIdGeneratorReactive getIdGenerator(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ca, params.getName()), RIdGeneratorReactive.class);
}
@ -541,7 +540,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetReactive<V> getSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonSet<V> set = new RedissonSet<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(set, this), RSetReactive.class);
@ -578,7 +577,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLexSortedSetReactive getLexSortedSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonLexSortedSet set = new RedissonLexSortedSet(ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetReactive(set),
@ -602,7 +601,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RShardedTopicReactive getShardedTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonTopicReactive(topic), RShardedTopicReactive.class);
@ -625,7 +624,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RTopicReactive getTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonTopic topic = new RedissonTopic(params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonTopicReactive(topic), RTopicReactive.class);
@ -648,7 +647,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReliableTopicReactive getReliableTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonReliableTopic topic = new RedissonReliableTopic(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
@ -667,7 +666,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RPatternTopicReactive getPatternTopic(PatternTopicOptions options) {
PatternTopicParams params = (PatternTopicParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonPatternTopic(params.getCodec(), ca, params.getPattern()), RPatternTopicReactive.class);
}
@ -705,7 +704,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RRingBufferReactive<V> getRingBuffer(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonRingBuffer<V>(params.getCodec(), ca, params.getName(), null), RRingBufferReactive.class);
}
@ -727,7 +726,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueReactive<V>(queue), RBlockingQueueReactive.class);
@ -770,7 +769,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
@ -793,7 +792,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetCacheReactive<V> getSetCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RSetCache<V> set = new RedissonSetCache<V>(params.getCodec(), evictionScheduler, ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(set, this), RSetCacheReactive.class);
@ -807,7 +806,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RAtomicLongReactive getAtomicLong(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonAtomicLong(ca, params.getName()), RAtomicLongReactive.class);
}
@ -820,7 +819,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RAtomicDoubleReactive getAtomicDouble(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ca, params.getName()), RAtomicDoubleReactive.class);
}
@ -851,7 +850,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RRemoteService getRemoteService(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
String executorId = connectionManager.getServiceManager().getId();
if (params.getCodec() != null && params.getCodec() != connectionManager.getServiceManager().getCfg().getCodec()) {
executorId = executorId + ":" + params.getName();
@ -867,7 +866,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBitSetReactive getBitSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class);
}
@ -884,7 +883,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RFunctionReactive getFunction(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(ca, params.getCodec()), RFunctionReactive.class);
}
@ -901,7 +900,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RScriptReactive getScript(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonScript(ca, params.getCodec()), RScriptReactive.class);
}
@ -1065,7 +1064,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingDequeReactive<V> getBlockingDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeReactive<V>(deque), RBlockingDequeReactive.class);
@ -1092,7 +1091,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RTransferQueueReactive<V> getTransferQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ca = new CommandAsyncService(commandExecutor, params);
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
String remoteName = RedissonObject.suffixName(params.getName(), "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(params.getCodec(), ca, params.getName(), service);

@ -15,14 +15,13 @@
*/
package org.redisson;
import org.redisson.api.*;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapCacheOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.*;
import org.redisson.api.options.*;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonCodec;
import org.redisson.command.CommandAsyncService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
@ -91,7 +90,7 @@ public class RedissonRx implements RedissonRxClient {
PlainParams params = (PlainParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonStream<K, V>(params.getCodec(), new CommandAsyncService(commandExecutor, params), params.getName()), RStreamRx.class);
new RedissonStream<K, V>(params.getCodec(), new CommandRxService(commandExecutor, params), params.getName()), RStreamRx.class);
}
@Override
@ -108,7 +107,7 @@ public class RedissonRx implements RedissonRxClient {
public RSearchRx getSearch(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonSearch(params.getCodec(), new CommandAsyncService(commandExecutor, params)), RSearchRx.class);
new RedissonSearch(params.getCodec(), new CommandRxService(commandExecutor, params)), RSearchRx.class);
}
@Override
@ -128,7 +127,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RGeoRx<V> getGeo(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonScoredSortedSet<V> set = new RedissonScoredSortedSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, new RedissonGeo<V>(params.getCodec(), ce, params.getName(), null),
new RedissonScoredSortedSetRx<V>(set), RGeoRx.class);
@ -143,7 +142,7 @@ public class RedissonRx implements RedissonRxClient {
public RLockRx getFairLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonFairLock(new CommandAsyncService(commandExecutor, params), params.getName()), RLockRx.class);
new RedissonFairLock(new CommandRxService(commandExecutor, params), params.getName()), RLockRx.class);
}
@Override
@ -155,7 +154,7 @@ public class RedissonRx implements RedissonRxClient {
public RRateLimiterRx getRateLimiter(CommonOptions options) {
CommonParams params = (CommonParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonRateLimiter(new CommandAsyncService(commandExecutor, params), params.getName()), RRateLimiterRx.class);
new RedissonRateLimiter(new CommandRxService(commandExecutor, params), params.getName()), RRateLimiterRx.class);
}
@Override
@ -182,7 +181,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RSemaphoreRx getSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonSemaphore(ce, params.getName()), RSemaphoreRx.class);
}
@ -194,7 +193,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor,
new RedissonPermitExpirableSemaphore(ce, params.getName()), RPermitExpirableSemaphoreRx.class);
}
@ -219,7 +218,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RLockRx getLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonLock(ce, params.getName()), RLockRx.class);
}
@ -243,7 +242,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RFencedLockRx getFencedLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonFencedLock lock = new RedissonFencedLock(ce, params.getName());
return RxProxyBuilder.create(commandExecutor, lock, RFencedLockRx.class);
}
@ -274,7 +273,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RCountDownLatchRx getCountDownLatch(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ce, params.getName()), RCountDownLatchRx.class);
}
@ -353,7 +352,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBucketsRx getBuckets(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ce), RBucketsRx.class);
}
@ -365,7 +364,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RJsonBucketRx<V> getJsonBucket(JsonBucketOptions<V> options) {
JsonBucketParams<V> params = (JsonBucketParams<V>) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonJsonBucket<>(params.getCodec(), ce, params.getName()), RJsonBucketRx.class);
}
@ -382,7 +381,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RHyperLogLogRx<V> getHyperLogLog(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(params.getCodec(), ce, params.getName()), RHyperLogLogRx.class);
}
@ -394,7 +393,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RIdGeneratorRx getIdGenerator(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ce, params.getName()), RIdGeneratorRx.class);
}
@ -415,7 +414,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RListRx<V> getList(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonList<V> list = new RedissonList<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, list,
new RedissonListRx<V>(list), RListRx.class);
@ -438,7 +437,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RListMultimapRx<K, V> getListMultimap(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonListMultimap<K, V> listMultimap = new RedissonListMultimap<>(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, listMultimap,
new RedissonListMultimapRx<K, V>(listMultimap, commandExecutor), RListMultimapRx.class);
@ -553,7 +552,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RSetRx<V> getSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonSet<V> set = new RedissonSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set, this), RSetRx.class);
@ -576,7 +575,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RScoredSortedSetRx<V> getScoredSortedSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonScoredSortedSet<V> set = new RedissonScoredSortedSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonScoredSortedSetRx<>(set), RScoredSortedSetRx.class);
@ -592,7 +591,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RLexSortedSetRx getLexSortedSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonLexSortedSet set = new RedissonLexSortedSet(ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetRx(set), RLexSortedSetRx.class);
@ -613,7 +612,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RShardedTopicRx getShardedTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RShardedTopicRx.class);
}
@ -633,7 +632,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RTopicRx getTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RTopic topic = new RedissonTopic(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@ -651,7 +650,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RReliableTopicRx getReliableTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor,
new RedissonReliableTopic(params.getCodec(), ce, params.getName(), null), RReliableTopicRx.class);
}
@ -669,7 +668,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RPatternTopicRx getPatternTopic(PatternTopicOptions options) {
PatternTopicParams params = (PatternTopicParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(params.getCodec(), ce, params.getPattern()), RPatternTopicRx.class);
}
@ -688,7 +687,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RQueueRx<V> getQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonQueue<V>(params.getCodec(), ce, params.getName(), null),
new RedissonListRx<V>(new RedissonList<V>(params.getCodec(), ce, params.getName(), null)), RQueueRx.class);
}
@ -706,7 +705,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RRingBufferRx<V> getRingBuffer(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor,
new RedissonRingBuffer<V>(params.getCodec(), ce, params.getName(), null), RRingBufferRx.class);
}
@ -728,7 +727,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RBlockingQueueRx<V> getBlockingQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueRx<V>(queue), RBlockingQueueRx.class);
@ -751,7 +750,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RDequeRx<V> getDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonDeque<V> queue = new RedissonDeque<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonListRx<V>(queue), RDequeRx.class);
@ -774,7 +773,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ce, params.getName());
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<>(timeSeries, this), RTimeSeriesRx.class);
@ -797,7 +796,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RSetCacheRx<V> getSetCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RSetCache<V> set = new RedissonSetCache<V>(params.getCodec(), evictionScheduler, ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheRx<V>(set, this), RSetCacheRx.class);
@ -811,7 +810,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RAtomicLongRx getAtomicLong(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicLong(ce, params.getName()), RAtomicLongRx.class);
}
@ -823,7 +822,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RAtomicDoubleRx getAtomicDouble(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ce, params.getName()), RAtomicDoubleRx.class);
}
@ -870,7 +869,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBitSetRx getBitSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class);
}
@ -887,7 +886,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RFunctionRx getFunction(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonFuction(ce, params.getCodec()), RFunctionRx.class);
}
@ -904,7 +903,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RScriptRx getScript(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
return RxProxyBuilder.create(commandExecutor, new RedissonScript(ce, params.getCodec()), RScriptRx.class);
}
@ -1068,7 +1067,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RBlockingDequeRx<V> getBlockingDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeRx<V>(deque), RBlockingDequeRx.class);
@ -1097,7 +1096,7 @@ public class RedissonRx implements RedissonRxClient {
PlainParams params = (PlainParams) options;
String remoteName = RedissonObject.suffixName(params.getName(), "remoteService");
RRemoteService service = getRemoteService(remoteName);
CommandAsyncService ce = new CommandAsyncService(commandExecutor, params);
CommandRxService ce = new CommandRxService(commandExecutor, params);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(params.getCodec(), ce, params.getName(), service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);

Loading…
Cancel
Save