From 86fd2d3e6c0a866682177df2a08561a8470109ae Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 30 Mar 2024 14:51:10 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonReactive.java | 59 +++++++-------- .../main/java/org/redisson/RedissonRx.java | 75 +++++++++---------- 2 files changed, 66 insertions(+), 68 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 5fa2223c0..f796b30ac 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -22,7 +22,6 @@ import org.redisson.api.*; import org.redisson.api.options.*; import org.redisson.client.codec.Codec; import org.redisson.codec.JsonCodec; -import org.redisson.command.CommandAsyncService; import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; @@ -92,7 +91,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RStreamReactive getStream(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream(params.getCodec(), ca, params.getName()), RStreamReactive.class); } @@ -110,7 +109,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSearchReactive getSearch(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonSearch(params.getCodec(), ca), RSearchReactive.class); } @@ -155,7 +154,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RRateLimiterReactive getRateLimiter(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(ca, params.getName()), RRateLimiterReactive.class); } @@ -220,7 +219,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RLockReactive getLock(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(ca, params.getName()), RLockReactive.class); } @@ -244,7 +243,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RFencedLockReactive getFencedLock(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonFencedLock lock = new RedissonFencedLock(ca, params.getName()); return ReactiveProxyBuilder.create(commandExecutor, lock, RFencedLockReactive.class); } @@ -275,7 +274,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RCountDownLatchReactive getCountDownLatch(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ca, params.getName()), RCountDownLatchReactive.class); } @@ -316,7 +315,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBucketReactive getBucket(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonBucket(params.getCodec(), ca, params.getName()), RBucketReactive.class); } @@ -334,7 +333,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBucketsReactive getBuckets(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ca), RBucketsReactive.class); } @@ -360,7 +359,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RJsonBucketReactive getJsonBucket(JsonBucketOptions options) { JsonBucketParams params = (JsonBucketParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonJsonBucket(params.getCodec(), ca, params.getName()), RJsonBucketReactive.class); } @@ -378,7 +377,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RHyperLogLogReactive getHyperLogLog(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(params.getCodec(), ca, params.getName()), RHyperLogLogReactive.class); } @@ -391,7 +390,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RIdGeneratorReactive getIdGenerator(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ca, params.getName()), RIdGeneratorReactive.class); } @@ -541,7 +540,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSetReactive getSet(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonSet set = new RedissonSet(params.getCodec(), ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, set, new RedissonSetReactive(set, this), RSetReactive.class); @@ -578,7 +577,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RLexSortedSetReactive getLexSortedSet(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonLexSortedSet set = new RedissonLexSortedSet(ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, set, new RedissonLexSortedSetReactive(set), @@ -602,7 +601,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RShardedTopicReactive getShardedTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ca, params.getName()); return ReactiveProxyBuilder.create(commandExecutor, topic, new RedissonTopicReactive(topic), RShardedTopicReactive.class); @@ -625,7 +624,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RTopicReactive getTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonTopic topic = new RedissonTopic(params.getCodec(), ca, params.getName()); return ReactiveProxyBuilder.create(commandExecutor, topic, new RedissonTopicReactive(topic), RTopicReactive.class); @@ -648,7 +647,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RReliableTopicReactive getReliableTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonReliableTopic topic = new RedissonReliableTopic(params.getCodec(), ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, topic, new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class); @@ -667,7 +666,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RPatternTopicReactive getPatternTopic(PatternTopicOptions options) { PatternTopicParams params = (PatternTopicParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(params.getCodec(), ca, params.getPattern()), RPatternTopicReactive.class); } @@ -705,7 +704,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RRingBufferReactive getRingBuffer(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonRingBuffer(params.getCodec(), ca, params.getName(), null), RRingBufferReactive.class); } @@ -727,7 +726,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBlockingQueueReactive getBlockingQueue(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonBlockingQueue queue = new RedissonBlockingQueue(params.getCodec(), ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, queue, new RedissonBlockingQueueReactive(queue), RBlockingQueueReactive.class); @@ -770,7 +769,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RTimeSeriesReactive getTimeSeries(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RTimeSeries timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ca, params.getName()); return ReactiveProxyBuilder.create(commandExecutor, timeSeries, new RedissonTimeSeriesReactive(timeSeries, this), RTimeSeriesReactive.class); @@ -793,7 +792,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSetCacheReactive getSetCache(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RSetCache set = new RedissonSetCache(params.getCodec(), evictionScheduler, ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, set, new RedissonSetCacheReactive(set, this), RSetCacheReactive.class); @@ -807,7 +806,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RAtomicLongReactive getAtomicLong(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicLong(ca, params.getName()), RAtomicLongReactive.class); } @@ -820,7 +819,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RAtomicDoubleReactive getAtomicDouble(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ca, params.getName()), RAtomicDoubleReactive.class); } @@ -851,7 +850,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RRemoteService getRemoteService(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); String executorId = connectionManager.getServiceManager().getId(); if (params.getCodec() != null && params.getCodec() != connectionManager.getServiceManager().getCfg().getCodec()) { executorId = executorId + ":" + params.getName(); @@ -867,7 +866,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBitSetReactive getBitSet(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class); } @@ -884,7 +883,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RFunctionReactive getFunction(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(ca, params.getCodec()), RFunctionReactive.class); } @@ -901,7 +900,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RScriptReactive getScript(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); return ReactiveProxyBuilder.create(commandExecutor, new RedissonScript(ca, params.getCodec()), RScriptReactive.class); } @@ -1065,7 +1064,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBlockingDequeReactive getBlockingDeque(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); RedissonBlockingDeque deque = new RedissonBlockingDeque(params.getCodec(), ca, params.getName(), null); return ReactiveProxyBuilder.create(commandExecutor, deque, new RedissonBlockingDequeReactive(deque), RBlockingDequeReactive.class); @@ -1092,7 +1091,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RTransferQueueReactive getTransferQueue(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ca = new CommandAsyncService(commandExecutor, params); + CommandReactiveService ca = new CommandReactiveService(commandExecutor, params); String remoteName = RedissonObject.suffixName(params.getName(), "remoteService"); RRemoteService service = getRemoteService(remoteName); RedissonTransferQueue queue = new RedissonTransferQueue(params.getCodec(), ca, params.getName(), service); diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 16509ef52..bbace8264 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -15,14 +15,13 @@ */ package org.redisson; -import org.redisson.api.*; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.MapCacheOptions; import org.redisson.api.MapOptions; +import org.redisson.api.*; import org.redisson.api.options.*; import org.redisson.client.codec.Codec; import org.redisson.codec.JsonCodec; -import org.redisson.command.CommandAsyncService; import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; @@ -91,7 +90,7 @@ public class RedissonRx implements RedissonRxClient { PlainParams params = (PlainParams) options; return RxProxyBuilder.create(commandExecutor, - new RedissonStream(params.getCodec(), new CommandAsyncService(commandExecutor, params), params.getName()), RStreamRx.class); + new RedissonStream(params.getCodec(), new CommandRxService(commandExecutor, params), params.getName()), RStreamRx.class); } @Override @@ -108,7 +107,7 @@ public class RedissonRx implements RedissonRxClient { public RSearchRx getSearch(OptionalOptions options) { OptionalParams params = (OptionalParams) options; return RxProxyBuilder.create(commandExecutor, - new RedissonSearch(params.getCodec(), new CommandAsyncService(commandExecutor, params)), RSearchRx.class); + new RedissonSearch(params.getCodec(), new CommandRxService(commandExecutor, params)), RSearchRx.class); } @Override @@ -128,7 +127,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RGeoRx getGeo(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonScoredSortedSet set = new RedissonScoredSortedSet(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, new RedissonGeo(params.getCodec(), ce, params.getName(), null), new RedissonScoredSortedSetRx(set), RGeoRx.class); @@ -143,7 +142,7 @@ public class RedissonRx implements RedissonRxClient { public RLockRx getFairLock(CommonOptions options) { CommonParams params = (CommonParams) options; return RxProxyBuilder.create(commandExecutor, - new RedissonFairLock(new CommandAsyncService(commandExecutor, params), params.getName()), RLockRx.class); + new RedissonFairLock(new CommandRxService(commandExecutor, params), params.getName()), RLockRx.class); } @Override @@ -155,7 +154,7 @@ public class RedissonRx implements RedissonRxClient { public RRateLimiterRx getRateLimiter(CommonOptions options) { CommonParams params = (CommonParams) options; return RxProxyBuilder.create(commandExecutor, - new RedissonRateLimiter(new CommandAsyncService(commandExecutor, params), params.getName()), RRateLimiterRx.class); + new RedissonRateLimiter(new CommandRxService(commandExecutor, params), params.getName()), RRateLimiterRx.class); } @Override @@ -182,7 +181,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RSemaphoreRx getSemaphore(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonSemaphore(ce, params.getName()), RSemaphoreRx.class); } @@ -194,7 +193,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonPermitExpirableSemaphore(ce, params.getName()), RPermitExpirableSemaphoreRx.class); } @@ -219,7 +218,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RLockRx getLock(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonLock(ce, params.getName()), RLockRx.class); } @@ -243,7 +242,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RFencedLockRx getFencedLock(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonFencedLock lock = new RedissonFencedLock(ce, params.getName()); return RxProxyBuilder.create(commandExecutor, lock, RFencedLockRx.class); } @@ -274,7 +273,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RCountDownLatchRx getCountDownLatch(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ce, params.getName()), RCountDownLatchRx.class); } @@ -353,7 +352,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RBucketsRx getBuckets(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ce), RBucketsRx.class); } @@ -365,7 +364,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RJsonBucketRx getJsonBucket(JsonBucketOptions options) { JsonBucketParams params = (JsonBucketParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonJsonBucket<>(params.getCodec(), ce, params.getName()), RJsonBucketRx.class); } @@ -382,7 +381,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RHyperLogLogRx getHyperLogLog(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(params.getCodec(), ce, params.getName()), RHyperLogLogRx.class); } @@ -394,7 +393,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RIdGeneratorRx getIdGenerator(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ce, params.getName()), RIdGeneratorRx.class); } @@ -415,7 +414,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RListRx getList(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonList list = new RedissonList(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, list, new RedissonListRx(list), RListRx.class); @@ -438,7 +437,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RListMultimapRx getListMultimap(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonListMultimap listMultimap = new RedissonListMultimap<>(params.getCodec(), ce, params.getName()); return RxProxyBuilder.create(commandExecutor, listMultimap, new RedissonListMultimapRx(listMultimap, commandExecutor), RListMultimapRx.class); @@ -553,7 +552,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RSetRx getSet(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonSet set = new RedissonSet(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, set, new RedissonSetRx(set, this), RSetRx.class); @@ -576,7 +575,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RScoredSortedSetRx getScoredSortedSet(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonScoredSortedSet set = new RedissonScoredSortedSet(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, set, new RedissonScoredSortedSetRx<>(set), RScoredSortedSetRx.class); @@ -592,7 +591,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RLexSortedSetRx getLexSortedSet(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonLexSortedSet set = new RedissonLexSortedSet(ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, set, new RedissonLexSortedSetRx(set), RLexSortedSetRx.class); @@ -613,7 +612,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RShardedTopicRx getShardedTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ce, params.getName()); return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RShardedTopicRx.class); } @@ -633,7 +632,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RTopicRx getTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RTopic topic = new RedissonTopic(params.getCodec(), ce, params.getName()); return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class); } @@ -651,7 +650,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RReliableTopicRx getReliableTopic(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(params.getCodec(), ce, params.getName(), null), RReliableTopicRx.class); } @@ -669,7 +668,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RPatternTopicRx getPatternTopic(PatternTopicOptions options) { PatternTopicParams params = (PatternTopicParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(params.getCodec(), ce, params.getPattern()), RPatternTopicRx.class); } @@ -688,7 +687,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RQueueRx getQueue(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonQueue(params.getCodec(), ce, params.getName(), null), new RedissonListRx(new RedissonList(params.getCodec(), ce, params.getName(), null)), RQueueRx.class); } @@ -706,7 +705,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RRingBufferRx getRingBuffer(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonRingBuffer(params.getCodec(), ce, params.getName(), null), RRingBufferRx.class); } @@ -728,7 +727,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RBlockingQueueRx getBlockingQueue(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonBlockingQueue queue = new RedissonBlockingQueue(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, queue, new RedissonBlockingQueueRx(queue), RBlockingQueueRx.class); @@ -751,7 +750,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RDequeRx getDeque(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonDeque queue = new RedissonDeque(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, queue, new RedissonListRx(queue), RDequeRx.class); @@ -774,7 +773,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RTimeSeriesRx getTimeSeries(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RTimeSeries timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ce, params.getName()); return RxProxyBuilder.create(commandExecutor, timeSeries, new RedissonTimeSeriesRx<>(timeSeries, this), RTimeSeriesRx.class); @@ -797,7 +796,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RSetCacheRx getSetCache(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RSetCache set = new RedissonSetCache(params.getCodec(), evictionScheduler, ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, set, new RedissonSetCacheRx(set, this), RSetCacheRx.class); @@ -811,7 +810,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RAtomicLongRx getAtomicLong(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonAtomicLong(ce, params.getName()), RAtomicLongRx.class); } @@ -823,7 +822,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RAtomicDoubleRx getAtomicDouble(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ce, params.getName()), RAtomicDoubleRx.class); } @@ -870,7 +869,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RBitSetRx getBitSet(CommonOptions options) { CommonParams params = (CommonParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class); } @@ -887,7 +886,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RFunctionRx getFunction(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonFuction(ce, params.getCodec()), RFunctionRx.class); } @@ -904,7 +903,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RScriptRx getScript(OptionalOptions options) { OptionalParams params = (OptionalParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); return RxProxyBuilder.create(commandExecutor, new RedissonScript(ce, params.getCodec()), RScriptRx.class); } @@ -1068,7 +1067,7 @@ public class RedissonRx implements RedissonRxClient { @Override public RBlockingDequeRx getBlockingDeque(PlainOptions options) { PlainParams params = (PlainParams) options; - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonBlockingDeque deque = new RedissonBlockingDeque(params.getCodec(), ce, params.getName(), null); return RxProxyBuilder.create(commandExecutor, deque, new RedissonBlockingDequeRx(deque), RBlockingDequeRx.class); @@ -1097,7 +1096,7 @@ public class RedissonRx implements RedissonRxClient { PlainParams params = (PlainParams) options; String remoteName = RedissonObject.suffixName(params.getName(), "remoteService"); RRemoteService service = getRemoteService(remoteName); - CommandAsyncService ce = new CommandAsyncService(commandExecutor, params); + CommandRxService ce = new CommandRxService(commandExecutor, params); RedissonTransferQueue queue = new RedissonTransferQueue(params.getCodec(), ce, params.getName(), service); return RxProxyBuilder.create(commandExecutor, queue, new RedissonTransferQueueRx(queue), RTransferQueueRx.class);