refactoring

pull/5732/head
Nikita Koksharov 10 months ago
parent 86fd2d3e6c
commit 99bd0ea6d5

@ -91,7 +91,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RStreamReactive<K, V> getStream(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonStream<K, V>(params.getCodec(), ca, params.getName()), RStreamReactive.class);
}
@ -109,7 +109,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RSearchReactive getSearch(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSearch(params.getCodec(), ca), RSearchReactive.class);
}
@ -128,7 +128,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RGeoReactive<V> getGeo(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonGeo<V>(params.getCodec(), ca, params.getName(), null),
new RedissonScoredSortedSetReactive<V>(params.getCodec(), ca, params.getName()), RGeoReactive.class);
@ -142,7 +142,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLockReactive getFairLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFairLock(ca, params.getName()), RLockReactive.class);
}
@ -154,7 +154,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RRateLimiterReactive getRateLimiter(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(ca, params.getName()), RRateLimiterReactive.class);
}
@ -168,7 +168,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBinaryStreamReactive getBinaryStream(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonBinaryStream stream = new RedissonBinaryStream(ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, stream,
new RedissonBinaryStreamReactive(ca, stream), RBinaryStreamReactive.class);
@ -182,7 +182,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RSemaphoreReactive getSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSemaphore(ca, params.getName()), RSemaphoreReactive.class);
}
@ -194,7 +194,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonPermitExpirableSemaphore(ca, params.getName()), RPermitExpirableSemaphoreReactive.class);
}
@ -207,7 +207,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReadWriteLockReactive getReadWriteLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return new RedissonReadWriteLockReactive(ca, params.getName());
}
@ -219,7 +219,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLockReactive getLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(ca, params.getName()), RLockReactive.class);
}
@ -243,7 +243,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RFencedLockReactive getFencedLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonFencedLock lock = new RedissonFencedLock(ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, lock, RFencedLockReactive.class);
}
@ -274,7 +274,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RCountDownLatchReactive getCountDownLatch(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ca, params.getName()), RCountDownLatchReactive.class);
}
@ -295,7 +295,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(org.redisson.api.options.MapCacheOptions<K, V> options) {
MapCacheParams<K, V> params = (MapCacheParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RMapCache<K, V> map = new RedissonMapCache<>(params.getCodec(), evictionScheduler, ca,
params.getName(), null, null, writeBehindService);
return ReactiveProxyBuilder.create(commandExecutor, map,
@ -315,7 +315,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBucketReactive<V> getBucket(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonBucket<V>(params.getCodec(), ca, params.getName()), RBucketReactive.class);
}
@ -333,7 +333,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBucketsReactive getBuckets(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ca), RBucketsReactive.class);
}
@ -359,7 +359,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RJsonBucketReactive<V> getJsonBucket(JsonBucketOptions<V> options) {
JsonBucketParams<V> params = (JsonBucketParams<V>) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonJsonBucket<V>(params.getCodec(), ca, params.getName()), RJsonBucketReactive.class);
}
@ -377,7 +377,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonHyperLogLog<V>(params.getCodec(), ca, params.getName()), RHyperLogLogReactive.class);
}
@ -390,7 +390,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RIdGeneratorReactive getIdGenerator(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ca, params.getName()), RIdGeneratorReactive.class);
}
@ -409,7 +409,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RListReactive<V> getList(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonList<V>(params.getCodec(), ca, params.getName(), null),
new RedissonListReactive<V>(params.getCodec(), ca, params.getName()), RListReactive.class);
}
@ -429,7 +429,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RListMultimapReactive<K, V> getListMultimap(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonListMultimap<K, V>(params.getCodec(), ca, params.getName()),
new RedissonListMultimapReactive<K, V>(params.getCodec(), ca, params.getName()), RListMultimapReactive.class);
}
@ -449,7 +449,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(params.getCodec(), ca, params.getName()),
new RedissonSetMultimapReactive<K, V>(params.getCodec(), ca, params.getName(), this), RSetMultimapReactive.class);
}
@ -471,7 +471,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RListMultimapCacheReactive<K, V> getListMultimapCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonListMultimapCache<K, V> listMultimap = new RedissonListMultimapCache<>(evictionScheduler, params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, listMultimap,
new RedissonListMultimapCacheReactive<>(listMultimap, ca), RListMultimapCacheReactive.class);
@ -494,7 +494,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RSetMultimapCacheReactive<K, V> getSetMultimapCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonSetMultimapCache<K, V> setMultimap = new RedissonSetMultimapCache<>(evictionScheduler, params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, setMultimap,
new RedissonSetMultimapCacheReactive<K, V>(setMultimap, ca, this), RSetMultimapCacheReactive.class);
@ -517,7 +517,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RMapReactive<K, V> getMap(org.redisson.api.options.MapOptions<K, V> options) {
MapParams<K, V> params = (MapParams<K, V>) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonMap<K, V> map = new RedissonMap<>(params.getCodec(), ca, params.getName(), null, null, writeBehindService);
return ReactiveProxyBuilder.create(commandExecutor, map,
new RedissonMapReactive<K, V>(map, ca), RMapReactive.class);
@ -540,7 +540,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetReactive<V> getSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonSet<V> set = new RedissonSet<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(set, this), RSetReactive.class);
@ -561,7 +561,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonScoredSortedSet<V>(params.getCodec(), ca, params.getName(), null),
new RedissonScoredSortedSetReactive<V>(params.getCodec(), ca, params.getName()), RScoredSortedSetReactive.class);
}
@ -577,7 +577,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLexSortedSetReactive getLexSortedSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonLexSortedSet set = new RedissonLexSortedSet(ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetReactive(set),
@ -601,7 +601,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RShardedTopicReactive getShardedTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonTopicReactive(topic), RShardedTopicReactive.class);
@ -624,7 +624,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RTopicReactive getTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonTopic topic = new RedissonTopic(params.getCodec(), ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonTopicReactive(topic), RTopicReactive.class);
@ -647,7 +647,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReliableTopicReactive getReliableTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonReliableTopic topic = new RedissonReliableTopic(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
@ -666,7 +666,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RPatternTopicReactive getPatternTopic(PatternTopicOptions options) {
PatternTopicParams params = (PatternTopicParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonPatternTopic(params.getCodec(), ca, params.getPattern()), RPatternTopicReactive.class);
}
@ -686,7 +686,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RQueueReactive<V> getQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonQueue<V>(params.getCodec(), ca, params.getName(), null),
new RedissonListReactive<V>(params.getCodec(), ca, params.getName()), RQueueReactive.class);
}
@ -704,7 +704,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RRingBufferReactive<V> getRingBuffer(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonRingBuffer<V>(params.getCodec(), ca, params.getName(), null), RRingBufferReactive.class);
}
@ -726,7 +726,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueReactive<V>(queue), RBlockingQueueReactive.class);
@ -747,7 +747,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RDequeReactive<V> getDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonDeque<V>(params.getCodec(), ca, params.getName(), null),
new RedissonListReactive<V>(params.getCodec(), ca, params.getName()), RDequeReactive.class);
}
@ -769,7 +769,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ca, params.getName());
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
@ -792,7 +792,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetCacheReactive<V> getSetCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RSetCache<V> set = new RedissonSetCache<V>(params.getCodec(), evictionScheduler, ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(set, this), RSetCacheReactive.class);
@ -806,7 +806,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RAtomicLongReactive getAtomicLong(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonAtomicLong(ca, params.getName()), RAtomicLongReactive.class);
}
@ -819,7 +819,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RAtomicDoubleReactive getAtomicDouble(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ca, params.getName()), RAtomicDoubleReactive.class);
}
@ -850,7 +850,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RRemoteService getRemoteService(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
String executorId = connectionManager.getServiceManager().getId();
if (params.getCodec() != null && params.getCodec() != connectionManager.getServiceManager().getCfg().getCodec()) {
executorId = executorId + ":" + params.getName();
@ -866,7 +866,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBitSetReactive getBitSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class);
}
@ -883,7 +883,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RFunctionReactive getFunction(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(ca, params.getCodec()), RFunctionReactive.class);
}
@ -900,7 +900,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RScriptReactive getScript(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonScript(ca, params.getCodec()), RScriptReactive.class);
}
@ -922,7 +922,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RKeysReactive getKeys(KeysOptions options) {
KeysParams params = (KeysParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonKeys(ca), new RedissonKeysReactive(ca), RKeysReactive.class);
}
@ -1035,7 +1035,7 @@ public class RedissonReactive implements RedissonReactiveClient {
ops.writerRetryAttempts(params.getWriteRetryAttempts());
}
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RMap<K, V> map = new RedissonLocalCachedMap<>(params.getCodec(), ca, params.getName(),
ops, evictionScheduler, null, writeBehindService);
return ReactiveProxyBuilder.create(commandExecutor, map,
@ -1064,7 +1064,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingDequeReactive<V> getBlockingDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(params.getCodec(), ca, params.getName(), null);
return ReactiveProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeReactive<V>(deque), RBlockingDequeReactive.class);
@ -1091,7 +1091,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RTransferQueueReactive<V> getTransferQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveService ca = new CommandReactiveService(commandExecutor, params);
CommandReactiveExecutor ca = commandExecutor.copy(params);
String remoteName = RedissonObject.suffixName(params.getName(), "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(params.getCodec(), ca, params.getName(), service);

@ -90,7 +90,7 @@ public class RedissonRx implements RedissonRxClient {
PlainParams params = (PlainParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonStream<K, V>(params.getCodec(), new CommandRxService(commandExecutor, params), params.getName()), RStreamRx.class);
new RedissonStream<K, V>(params.getCodec(), commandExecutor.copy(params), params.getName()), RStreamRx.class);
}
@Override
@ -107,7 +107,7 @@ public class RedissonRx implements RedissonRxClient {
public RSearchRx getSearch(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonSearch(params.getCodec(), new CommandRxService(commandExecutor, params)), RSearchRx.class);
new RedissonSearch(params.getCodec(), commandExecutor.copy(params)), RSearchRx.class);
}
@Override
@ -127,7 +127,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RGeoRx<V> getGeo(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonScoredSortedSet<V> set = new RedissonScoredSortedSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, new RedissonGeo<V>(params.getCodec(), ce, params.getName(), null),
new RedissonScoredSortedSetRx<V>(set), RGeoRx.class);
@ -142,7 +142,7 @@ public class RedissonRx implements RedissonRxClient {
public RLockRx getFairLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonFairLock(new CommandRxService(commandExecutor, params), params.getName()), RLockRx.class);
new RedissonFairLock(commandExecutor.copy(params), params.getName()), RLockRx.class);
}
@Override
@ -154,7 +154,7 @@ public class RedissonRx implements RedissonRxClient {
public RRateLimiterRx getRateLimiter(CommonOptions options) {
CommonParams params = (CommonParams) options;
return RxProxyBuilder.create(commandExecutor,
new RedissonRateLimiter(new CommandRxService(commandExecutor, params), params.getName()), RRateLimiterRx.class);
new RedissonRateLimiter(commandExecutor.copy(params), params.getName()), RRateLimiterRx.class);
}
@Override
@ -167,7 +167,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBinaryStreamRx getBinaryStream(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonBinaryStream stream = new RedissonBinaryStream(ce, params.getName());
return RxProxyBuilder.create(commandExecutor, stream,
new RedissonBinaryStreamRx(ce, stream), RBinaryStreamRx.class);
@ -181,7 +181,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RSemaphoreRx getSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonSemaphore(ce, params.getName()), RSemaphoreRx.class);
}
@ -193,7 +193,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor,
new RedissonPermitExpirableSemaphore(ce, params.getName()), RPermitExpirableSemaphoreRx.class);
}
@ -206,7 +206,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RReadWriteLockRx getReadWriteLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return new RedissonReadWriteLockRx(ce, params.getName());
}
@ -218,7 +218,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RLockRx getLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonLock(ce, params.getName()), RLockRx.class);
}
@ -242,7 +242,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RFencedLockRx getFencedLock(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonFencedLock lock = new RedissonFencedLock(ce, params.getName());
return RxProxyBuilder.create(commandExecutor, lock, RFencedLockRx.class);
}
@ -273,7 +273,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RCountDownLatchRx getCountDownLatch(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonCountDownLatch(ce, params.getName()), RCountDownLatchRx.class);
}
@ -314,7 +314,7 @@ public class RedissonRx implements RedissonRxClient {
ops.writerRetryAttempts(params.getWriteRetryAttempts());
}
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonMapCache<K, V> map = new RedissonMapCache<>(params.getCodec(), evictionScheduler,
ce, params.getName(), null, ops, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
@ -334,7 +334,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RBucketRx<V> getBucket(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor,
new RedissonBucket<V>(params.getCodec(), ce, params.getName()), RBucketRx.class);
}
@ -352,7 +352,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBucketsRx getBuckets(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonBuckets(params.getCodec(), ce), RBucketsRx.class);
}
@ -364,7 +364,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RJsonBucketRx<V> getJsonBucket(JsonBucketOptions<V> options) {
JsonBucketParams<V> params = (JsonBucketParams<V>) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonJsonBucket<>(params.getCodec(), ce, params.getName()), RJsonBucketRx.class);
}
@ -381,7 +381,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RHyperLogLogRx<V> getHyperLogLog(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(params.getCodec(), ce, params.getName()), RHyperLogLogRx.class);
}
@ -393,7 +393,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RIdGeneratorRx getIdGenerator(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonIdGenerator(ce, params.getName()), RIdGeneratorRx.class);
}
@ -414,7 +414,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RListRx<V> getList(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonList<V> list = new RedissonList<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, list,
new RedissonListRx<V>(list), RListRx.class);
@ -437,7 +437,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RListMultimapRx<K, V> getListMultimap(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonListMultimap<K, V> listMultimap = new RedissonListMultimap<>(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, listMultimap,
new RedissonListMultimapRx<K, V>(listMultimap, commandExecutor), RListMultimapRx.class);
@ -460,7 +460,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RListMultimapCacheRx<K, V> getListMultimapCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonListMultimapCache<K, V> listMultimap = new RedissonListMultimapCache<>(evictionScheduler, params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, listMultimap,
new RedissonListMultimapCacheRx<K, V>(listMultimap, ce), RListMultimapCacheRx.class);
@ -483,7 +483,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RSetMultimapRx<K, V> getSetMultimap(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonSetMultimap<K, V> setMultimap = new RedissonSetMultimap<>(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, setMultimap,
new RedissonSetMultimapRx<>(setMultimap, ce, this), RSetMultimapRx.class);
@ -506,7 +506,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RSetMultimapCacheRx<K, V> getSetMultimapCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonSetMultimapCache<K, V> setMultimap = new RedissonSetMultimapCache<>(evictionScheduler, params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, setMultimap,
new RedissonSetMultimapCacheRx<>(setMultimap, ce, this), RSetMultimapCacheRx.class);
@ -529,7 +529,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RMapRx<K, V> getMap(org.redisson.api.options.MapOptions<K, V> options) {
MapParams<K, V> params = (MapParams<K, V>) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonMap<K, V> map = new RedissonMap<>(params.getCodec(), ce, params.getName(), null, null, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<>(map, ce), RMapRx.class);
@ -552,7 +552,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RSetRx<V> getSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonSet<V> set = new RedissonSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set, this), RSetRx.class);
@ -575,7 +575,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RScoredSortedSetRx<V> getScoredSortedSet(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonScoredSortedSet<V> set = new RedissonScoredSortedSet<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonScoredSortedSetRx<>(set), RScoredSortedSetRx.class);
@ -591,7 +591,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RLexSortedSetRx getLexSortedSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonLexSortedSet set = new RedissonLexSortedSet(ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetRx(set), RLexSortedSetRx.class);
@ -612,7 +612,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RShardedTopicRx getShardedTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RShardedTopic topic = new RedissonShardedTopic(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RShardedTopicRx.class);
}
@ -632,7 +632,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RTopicRx getTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RTopic topic = new RedissonTopic(params.getCodec(), ce, params.getName());
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@ -650,7 +650,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RReliableTopicRx getReliableTopic(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor,
new RedissonReliableTopic(params.getCodec(), ce, params.getName(), null), RReliableTopicRx.class);
}
@ -668,7 +668,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RPatternTopicRx getPatternTopic(PatternTopicOptions options) {
PatternTopicParams params = (PatternTopicParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(params.getCodec(), ce, params.getPattern()), RPatternTopicRx.class);
}
@ -687,7 +687,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RQueueRx<V> getQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonQueue<V>(params.getCodec(), ce, params.getName(), null),
new RedissonListRx<V>(new RedissonList<V>(params.getCodec(), ce, params.getName(), null)), RQueueRx.class);
}
@ -705,7 +705,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RRingBufferRx<V> getRingBuffer(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor,
new RedissonRingBuffer<V>(params.getCodec(), ce, params.getName(), null), RRingBufferRx.class);
}
@ -727,7 +727,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RBlockingQueueRx<V> getBlockingQueue(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueRx<V>(queue), RBlockingQueueRx.class);
@ -750,7 +750,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RDequeRx<V> getDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonDeque<V> queue = new RedissonDeque<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonListRx<V>(queue), RDequeRx.class);
@ -773,7 +773,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<>(params.getCodec(), evictionScheduler, ce, params.getName());
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<>(timeSeries, this), RTimeSeriesRx.class);
@ -796,7 +796,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RSetCacheRx<V> getSetCache(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RSetCache<V> set = new RedissonSetCache<V>(params.getCodec(), evictionScheduler, ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheRx<V>(set, this), RSetCacheRx.class);
@ -810,7 +810,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RAtomicLongRx getAtomicLong(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicLong(ce, params.getName()), RAtomicLongRx.class);
}
@ -822,7 +822,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RAtomicDoubleRx getAtomicDouble(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(ce, params.getName()), RAtomicDoubleRx.class);
}
@ -869,7 +869,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBitSetRx getBitSet(CommonOptions options) {
CommonParams params = (CommonParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class);
}
@ -886,7 +886,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RFunctionRx getFunction(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonFuction(ce, params.getCodec()), RFunctionRx.class);
}
@ -903,7 +903,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RScriptRx getScript(OptionalOptions options) {
OptionalParams params = (OptionalParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonScript(ce, params.getCodec()), RScriptRx.class);
}
@ -925,7 +925,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RKeysRx getKeys(KeysOptions options) {
KeysParams params = (KeysParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor, new RedissonKeys(ce), new RedissonKeysRx(ce), RKeysRx.class);
}
@ -1037,7 +1037,7 @@ public class RedissonRx implements RedissonRxClient {
ops.writerRetryAttempts(params.getWriteRetryAttempts());
}
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RMap<K, V> map = new RedissonLocalCachedMap<>(params.getCodec(), ce, params.getName(),
ops, evictionScheduler, null, writeBehindService);
@ -1067,7 +1067,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <V> RBlockingDequeRx<V> getBlockingDeque(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(params.getCodec(), ce, params.getName(), null);
return RxProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeRx<V>(deque), RBlockingDequeRx.class);
@ -1096,7 +1096,7 @@ public class RedissonRx implements RedissonRxClient {
PlainParams params = (PlainParams) options;
String remoteName = RedissonObject.suffixName(params.getName(), "remoteService");
RRemoteService service = getRemoteService(remoteName);
CommandRxService ce = new CommandRxService(commandExecutor, params);
CommandRxExecutor ce = commandExecutor.copy(params);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(params.getCodec(), ce, params.getName(), service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);

@ -16,6 +16,7 @@
package org.redisson.reactive;
import org.redisson.api.RFuture;
import org.redisson.api.options.ObjectParams;
import org.redisson.command.CommandAsyncExecutor;
import reactor.core.publisher.Mono;
@ -30,4 +31,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Mono<R> reactive(Callable<RFuture<R>> supplier);
CommandReactiveExecutor copy(ObjectParams objectParams);
}

@ -15,9 +15,6 @@
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import org.redisson.api.RFuture;
import org.redisson.api.options.ObjectParams;
import org.redisson.command.CommandAsyncExecutor;
@ -27,6 +24,9 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
/**
*
* @author Nikita Koksharov
@ -42,6 +42,10 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
super(executor, objectParams);
}
public CommandReactiveExecutor copy(ObjectParams objectParams) {
return new CommandReactiveService(this, objectParams);
}
@Override
public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
return Flux.<R>create(emitter -> {

@ -17,6 +17,7 @@ package org.redisson.rx;
import io.reactivex.rxjava3.core.Flowable;
import org.redisson.api.RFuture;
import org.redisson.api.options.ObjectParams;
import org.redisson.command.CommandAsyncExecutor;
import java.util.concurrent.Callable;
@ -30,4 +31,6 @@ public interface CommandRxExecutor extends CommandAsyncExecutor {
<R> Flowable<R> flowable(Callable<RFuture<R>> supplier);
CommandRxExecutor copy(ObjectParams objectParams);
}

@ -44,6 +44,10 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
super(executor, objectParams);
}
public CommandRxExecutor copy(ObjectParams objectParams) {
return new CommandRxService(this, objectParams);
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
ReplayProcessor<R> p = ReplayProcessor.create();

Loading…
Cancel
Save