RxJava 2 support #1177

pull/1705/head
Nikita 7 years ago
parent 30c9f9d59c
commit 57f935ebc5

@ -77,6 +77,11 @@
<artifactId>reactor-stream</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.13</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>

@ -77,6 +77,7 @@ import org.redisson.api.RTopic;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
@ -166,21 +167,44 @@ public class Redisson implements RedissonClient {
}
/**
* Create reactive Redisson instance with default config
* Create Reactive Redisson instance with default config
*
* @return Redisson instance
*/
public static RedissonReactiveClient createRx() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return createReactive(config);
}
/**
* Create Reactive Redisson instance with provided config
*
* @param config for Redisson
* @return Redisson instance
*/
public static RedissonRxClient createRx(Config config) {
RedissonRx react = new RedissonRx(config);
// if (config.isReferenceEnabled()) {
// react.enableRedissonReferenceSupport();
// }
return react;
}
/**
* Create Reactive Redisson instance with default config
*
* @return Redisson instance
*/
public static RedissonReactiveClient createReactive() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399");
// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379");
// config.useClusterServers().addNodeAddress("127.0.0.1:7000");
return createReactive(config);
}
/**
* Create reactive Redisson instance with provided config
* Create Reactive Redisson instance with provided config
*
* @param config for Redisson
* @return Redisson instance

@ -1216,6 +1216,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return get(scanIteratorAsync(name, client, startPos, pattern, count));
}
@Override
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());

@ -0,0 +1,451 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RBucketRx;
import org.redisson.api.RKeysRx;
import org.redisson.api.RMapCacheRx;
import org.redisson.api.RMapRx;
import org.redisson.api.RSetRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonKeysRx;
import org.redisson.rx.RedissonMapCacheRx;
import org.redisson.rx.RedissonMapRx;
import org.redisson.rx.RedissonSetRx;
import org.redisson.rx.RxProxyBuilder;
/**
* Main infrastructure class allows to get access
* to all Redisson objects on top of Redis server.
*
* @author Nikita Koksharov
*
*/
public class RedissonRx implements RedissonRxClient {
protected final EvictionScheduler evictionScheduler;
protected final CommandRxExecutor commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ReferenceCodecProvider codecProvider;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
protected RedissonRx(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandRxService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getReferenceCodecProvider();
}
// @Override
// public <K, V> RStreamReactive<K, V> getStream(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream<K, V>(commandExecutor, name), RStreamReactive.class);
// }
//
// @Override
// public <K, V> RStreamReactive<K, V> getStream(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream<K, V>(codec, commandExecutor, name), RStreamReactive.class);
// }
//
// @Override
// public <V> RGeoReactive<V> getGeo(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonGeo<V>(commandExecutor, name, null),
// new RedissonScoredSortedSetReactive<V>(commandExecutor, name), RGeoReactive.class);
// }
//
// @Override
// public <V> RGeoReactive<V> getGeo(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonGeo<V>(codec, commandExecutor, name, null),
// new RedissonScoredSortedSetReactive<V>(codec, commandExecutor, name), RGeoReactive.class);
// }
//
// @Override
// public RLockReactive getFairLock(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonFairLock(commandExecutor, name), RLockReactive.class);
// }
//
// @Override
// public RRateLimiterReactive getRateLimiter(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(commandExecutor, name), RRateLimiterReactive.class);
// }
//
// @Override
// public RSemaphoreReactive getSemaphore(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonSemaphore(commandExecutor, name, semaphorePubSub), RSemaphoreReactive.class);
// }
//
// @Override
// public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub), RPermitExpirableSemaphoreReactive.class);
// }
//
// @Override
// public RReadWriteLockReactive getReadWriteLock(String name) {
// return new RedissonReadWriteLockReactive(commandExecutor, name);
// }
//
// @Override
// public RLockReactive getLock(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(commandExecutor, name), RLockReactive.class);
// }
//
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
}
@Override
public <V> RBucketRx<V> getBucket(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonBucket<V>(commandExecutor, name), RBucketRx.class);
}
@Override
public <V> RBucketRx<V> getBucket(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonBucket<V>(codec, commandExecutor, name), RBucketRx.class);
}
// @Override
// public <V> List<RBucketReactive<V>> findBuckets(String pattern) {
// RKeys redissonKeys = new RedissonKeys(commandExecutor);
// Iterable<String> keys = redissonKeys.getKeysByPattern(pattern);
//
// List<RBucketReactive<V>> buckets = new ArrayList<RBucketReactive<V>>();
// for (Object key : keys) {
// if(key != null) {
// buckets.add(this.<V>getBucket(key.toString()));
// }
// }
// return buckets;
// }
//
// @Override
// public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(commandExecutor, name), RHyperLogLogReactive.class);
// }
//
// @Override
// public <V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(codec, commandExecutor, name), RHyperLogLogReactive.class);
// }
//
// @Override
// public <V> RListReactive<V> getList(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonList<V>(commandExecutor, name, null),
// new RedissonListReactive<V>(commandExecutor, name), RListReactive.class);
// }
//
// @Override
// public <V> RListReactive<V> getList(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonList<V>(codec, commandExecutor, name, null),
// new RedissonListReactive<V>(codec, commandExecutor, name), RListReactive.class);
// }
//
// @Override
// public <K, V> RListMultimapReactive<K, V> getListMultimap(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonListMultimap<K, V>(commandExecutor, name),
// new RedissonListMultimapReactive<K, V>(commandExecutor, name), RListMultimapReactive.class);
// }
//
// @Override
// public <K, V> RListMultimapReactive<K, V> getListMultimap(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonListMultimap<K, V>(codec, commandExecutor, name),
// new RedissonListMultimapReactive<K, V>(codec, commandExecutor, name), RListMultimapReactive.class);
// }
//
// @Override
// public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(commandExecutor, name),
// new RedissonSetMultimapReactive<K, V>(commandExecutor, name), RSetMultimapReactive.class);
// }
//
// @Override
// public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(codec, commandExecutor, name),
// new RedissonSetMultimapReactive<K, V>(codec, commandExecutor, name), RSetMultimapReactive.class);
// }
@Override
public <K, V> RMapRx<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set), RSetRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set), RSetRx.class);
}
// @Override
// public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonScoredSortedSet<V>(commandExecutor, name, null),
// new RedissonScoredSortedSetReactive<V>(commandExecutor, name), RScoredSortedSetReactive.class);
// }
//
// @Override
// public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonScoredSortedSet<V>(codec, commandExecutor, name, null),
// new RedissonScoredSortedSetReactive<V>(codec, commandExecutor, name), RScoredSortedSetReactive.class);
// }
//
// @Override
// public RLexSortedSetReactive getLexSortedSet(String name) {
// RedissonLexSortedSet set = new RedissonLexSortedSet(commandExecutor, name, null);
// return ReactiveProxyBuilder.create(commandExecutor, set,
// new RedissonLexSortedSetReactive(set),
// RLexSortedSetReactive.class);
// }
//
// @Override
// public <M> RTopicReactive<M> getTopic(String name) {
// return new RedissonTopicReactive<M>(commandExecutor, name);
// }
//
// @Override
// public <M> RTopicReactive<M> getTopic(String name, Codec codec) {
// return new RedissonTopicReactive<M>(codec, commandExecutor, name);
// }
//
// @Override
// public <M> RPatternTopicReactive<M> getPatternTopic(String pattern) {
// return new RedissonPatternTopicReactive<M>(commandExecutor, pattern);
// }
//
// @Override
// public <M> RPatternTopicReactive<M> getPatternTopic(String pattern, Codec codec) {
// return new RedissonPatternTopicReactive<M>(codec, commandExecutor, pattern);
// }
//
// @Override
// public <V> RQueueReactive<V> getQueue(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonQueue<V>(commandExecutor, name, null),
// new RedissonListReactive<V>(commandExecutor, name), RQueueReactive.class);
// }
//
// @Override
// public <V> RQueueReactive<V> getQueue(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonQueue<V>(codec, commandExecutor, name, null),
// new RedissonListReactive<V>(codec,commandExecutor, name), RQueueReactive.class);
// }
//
// @Override
// public <V> RBlockingQueueReactive<V> getBlockingQueue(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue<V>(commandExecutor, name, null),
// new RedissonListReactive<V>(commandExecutor, name), RBlockingQueueReactive.class);
// }
//
// @Override
// public <V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue<V>(codec, commandExecutor, name, null),
// new RedissonListReactive<V>(codec, commandExecutor, name), RBlockingQueueReactive.class);
// }
//
// @Override
// public <V> RDequeReactive<V> getDeque(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonDeque<V>(commandExecutor, name, null),
// new RedissonListReactive<V>(commandExecutor, name), RDequeReactive.class);
// }
//
// @Override
// public <V> RDequeReactive<V> getDeque(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonDeque<V>(codec, commandExecutor, name, null),
// new RedissonListReactive<V>(codec, commandExecutor, name), RDequeReactive.class);
// }
//
// @Override
// public <V> RSetCacheReactive<V> getSetCache(String name) {
// RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
// return ReactiveProxyBuilder.create(commandExecutor, set,
// new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
// }
//
// @Override
// public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
// RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
// return ReactiveProxyBuilder.create(commandExecutor, set,
// new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
// }
//
// @Override
// public RAtomicLongReactive getAtomicLong(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicLong(commandExecutor, name), RAtomicLongReactive.class);
// }
//
// @Override
// public RAtomicDoubleReactive getAtomicDouble(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleReactive.class);
// }
//
// @Override
// public RBitSetReactive getBitSet(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(commandExecutor, name), RBitSetReactive.class);
// }
//
// @Override
// public RScriptReactive getScript() {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonScript(commandExecutor), RScriptReactive.class);
// }
//
// @Override
// public RBatchReactive createBatch(BatchOptions options) {
// RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, options);
// if (config.isReferenceEnabled()) {
// batch.enableRedissonReferenceSupport(this);
// }
// return batch;
// }
//
// @Override
// public RBatchReactive createBatch() {
// return createBatch(BatchOptions.defaults());
// }
@Override
public RKeysRx getKeys() {
return RxProxyBuilder.create(commandExecutor, new RedissonKeys(commandExecutor), new RedissonKeysRx(commandExecutor), RKeysRx.class);
}
@Override
public Config getConfig() {
return config;
}
@Override
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
}
@Override
public NodesGroup<ClusterNode> getClusterNodesGroup() {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);
}
@Override
public void shutdown() {
connectionManager.shutdown();
}
@Override
public boolean isShutdown() {
return connectionManager.isShutdown();
}
@Override
public boolean isShuttingDown() {
return connectionManager.isShuttingDown();
}
// protected void enableRedissonReferenceSupport() {
// this.commandExecutor.enableRedissonReferenceSupport(this);
// }
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, MapOptions<K, V> options) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
}
// @Override
// public RTransactionReactive createTransaction(TransactionOptions options) {
// return new RedissonTransactionReactive(commandExecutor, options);
// }
//
// @Override
// public <V> RBlockingDequeReactive<V> getBlockingDeque(String name) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque<V>(commandExecutor, name, null),
// new RedissonListReactive<V>(commandExecutor, name), RBlockingDequeReactive.class);
// }
//
// @Override
// public <V> RBlockingDequeReactive<V> getBlockingDeque(String name, Codec codec) {
// return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque<V>(codec, commandExecutor, name, null),
// new RedissonListReactive<V>(codec, commandExecutor, name), RBlockingDequeReactive.class);
// }
}

@ -0,0 +1,111 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
/**
* Reactive implementation of object holder. Max size of object is 512MB
*
* @author Nikita Koksharov
*
* @param <V> - the type of object
*/
public interface RBucketRx<V> extends RExpirableRx {
/**
* Returns size of object in bytes
*
* @return object size
*/
Flowable<Long> size();
/**
* Tries to set element atomically into empty holder.
*
* @param value - value to set
* @return {@code true} if successful, or {@code false} if
* element was already set
*/
Flowable<Boolean> trySet(V value);
/**
* Tries to set element atomically into empty holder with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return {@code true} if successful, or {@code false} if
* element was already set
*/
Flowable<Boolean> trySet(V value, long timeToLive, TimeUnit timeUnit);
/**
* Atomically sets the value to the given updated value
* only if serialized state of the current value equals
* to serialized state of the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful; or {@code false} if the actual value
* was not equal to the expected value.
*/
Flowable<Boolean> compareAndSet(V expect, V update);
/**
* Retrieves current element in the holder and replaces it with <code>newValue</code>.
*
* @param newValue - value to set
* @return previous value
*/
Flowable<V> getAndSet(V newValue);
/**
* Retrieves element stored in the holder.
*
* @return element
*/
Flowable<V> get();
/**
* Retrieves element in the holder and removes it.
*
* @return element
*/
Flowable<V> getAndDelete();
/**
* Stores element into the holder.
*
* @param value - value to set
* @return void
*/
Flowable<Void> set(V value);
/**
* Stores element into the holder with defined <code>timeToLive</code> interval.
*
* @param value - value to set
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
Flowable<Void> set(V value, long timeToLive, TimeUnit timeUnit);
}

@ -0,0 +1,121 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import org.reactivestreams.Publisher;
import io.reactivex.Flowable;
/**
* Common reactive interface for collection object
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RCollectionRx<V> extends RExpirableRx {
/**
* Returns iterator over collection elements
*
* @return iterator
*/
Flowable<V> iterator();
/**
* Retains only the elements in this collection that are contained in the
* specified collection (optional operation).
*
* @param c collection containing elements to be retained in this collection
* @return <code>true</code> if this collection changed as a result of the call
*/
Flowable<Boolean> retainAll(Collection<?> c);
/**
* Removes all of this collection's elements that are also contained in the
* specified collection (optional operation).
*
* @param c collection containing elements to be removed from this collection
* @return <code>true</code> if this collection changed as a result of the
* call
*/
Flowable<Boolean> removeAll(Collection<?> c);
/**
* Returns <code>true</code> if this collection contains encoded state of the specified element.
*
* @param o element whose presence in this collection is to be tested
* @return <code>true</code> if this collection contains the specified
* element and <code>false</code> otherwise
*/
Flowable<Boolean> contains(V o);
/**
* Returns <code>true</code> if this collection contains all of the elements
* in the specified collection.
*
* @param c collection to be checked for containment in this collection
* @return <code>true</code> if this collection contains all of the elements
* in the specified collection
*/
Flowable<Boolean> containsAll(Collection<?> c);
/**
* Removes a single instance of the specified element from this
* collection, if it is present (optional operation).
*
* @param o element to be removed from this collection, if present
* @return <code>true</code> if an element was removed as a result of this call
*/
Flowable<Boolean> remove(V o);
/**
* Returns number of elements in this collection.
*
* @return size of collection
*/
Flowable<Integer> size();
/**
* Adds element into this collection.
*
* @param e - element to add
* @return <code>true</code> if an element was added
* and <code>false</code> if it is already present
*/
Flowable<Boolean> add(V e);
/**
* Adds all elements contained in the specified collection
*
* @param c - collection of elements to add
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Flowable<Boolean> addAll(Publisher<? extends V> c);
/**
* Adds all elements contained in the specified collection
*
* @param c - collection of elements to add
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Flowable<Boolean> addAll(Collection<? extends V> c);
}

@ -0,0 +1,77 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
/**
* Base interface for all Redisson objects
* which support expiration or TTL
*
* @author Nikita Koksharov
*
*/
public interface RExpirableRx extends RObjectRx {
/**
* Set a timeout for object in mode. After the timeout has expired,
* the key will automatically be deleted.
*
* @param timeToLive - timeout before object will be deleted
* @param timeUnit - timeout time unit
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Flowable<Boolean> expire(long timeToLive, TimeUnit timeUnit);
/**
* Set an expire date for object in mode. When expire date comes
* the key will automatically be deleted.
*
* @param timestamp - expire date
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Flowable<Boolean> expireAt(Date timestamp);
/**
* Set an expire date for object in mode. When expire date comes
* the key will automatically be deleted.
*
* @param timestamp - expire date in milliseconds (Unix timestamp)
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Flowable<Boolean> expireAt(long timestamp);
/**
* Clear an expire timeout or expire date for object in mode.
* Object will not be deleted.
*
* @return <code>true</code> if the timeout was cleared and <code>false</code> if not
*/
Flowable<Boolean> clearExpire();
/**
* Get remaining time to live of object in milliseconds.
*
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Flowable<Long> remainTimeToLive();
}

@ -0,0 +1,291 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
/**
*
* @author Nikita Koksharov
*
*/
public interface RKeysRx {
/**
* Move object to another database
*
* @param name of object
* @param database - Redis database number
* @return <code>true</code> if key was moved else <code>false</code>
*/
Flowable<Boolean> move(String name, int database);
/**
* Transfer object from source Redis instance to destination Redis instance
*
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
*/
Flowable<Void> migrate(String name, String host, int port, int database, long timeout);
/**
* Copy object from source Redis instance to destination Redis instance
*
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
*/
Flowable<Void> copy(String name, String host, int port, int database, long timeout);
/**
* Set a timeout for object. After the timeout has expired,
* the key will automatically be deleted.
*
* @param name of object
* @param timeToLive - timeout before object will be deleted
* @param timeUnit - timeout time unit
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Flowable<Boolean> expire(String name, long timeToLive, TimeUnit timeUnit);
/**
* Set an expire date for object. When expire date comes
* the key will automatically be deleted.
*
* @param name of object
* @param timestamp - expire date in milliseconds (Unix timestamp)
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Flowable<Boolean> expireAt(String name, long timestamp);
/**
* Clear an expire timeout or expire date for object.
*
* @param name of object
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
*/
Flowable<Boolean> clearExpire(String name);
/**
* Rename object with <code>oldName</code> to <code>newName</code>
* only if new key is not exists
*
* @param oldName - old name of object
* @param newName - new name of object
* @return <code>true</code> if object has been renamed successfully and <code>false</code> otherwise
*/
Flowable<Boolean> renamenx(String oldName, String newName);
/**
* Rename current object key to <code>newName</code>
*
* @param currentName - current name of object
* @param newName - new name of object
*/
Flowable<Void> rename(String currentName, String newName);
/**
* Remaining time to live of Redisson object that has a timeout
*
* @param name of key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Flowable<Long> remainTimeToLive(String name);
/**
* Update the last access time of an object.
*
* @param names of keys
* @return count of objects were touched
*/
Flowable<Long> touch(String... names);
/**
* Checks if provided keys exist
*
* @param names of keys
* @return amount of existing keys
*/
Flowable<Long> countExists(String... names);
/**
* Get Redis object type by key
*
* @param key - name of key
* @return type of key
*/
Flowable<RType> getType(String key);
/**
* Load keys in incrementally iterate mode. Keys traversed with SCAN operation.
* Each SCAN operation loads up to 10 keys per request.
*
* @return keys
*/
Flowable<String> getKeys();
/**
* Load keys in incrementally iterate mode. Keys traversed with SCAN operation.
* Each SCAN operation loads up to <code>count</code> keys per request.
*
* @param count - keys loaded per request to Redis
* @return keys
*/
Flowable<String> getKeys(int count);
/**
* Find keys by pattern and load it in incrementally iterate mode.
* Keys traversed with SCAN operation.
* Each SCAN operation loads up to 10 keys per request.
* <p>
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return keys
*/
Flowable<String> getKeysByPattern(String pattern);
/**
* Get all keys by pattern using iterator.
* Keys traversed with SCAN operation. Each SCAN operation loads
* up to <code>count</code> keys per request.
* <p>
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @param count - keys loaded per request to Redis
* @return keys
*/
Flowable<String> getKeysByPattern(String pattern, int count);
/**
* Get hash slot identifier for key.
* Available for cluster nodes only.
*
* Uses <code>KEYSLOT</code> Redis command.
*
* @param key - name of key
* @return slot number
*/
Flowable<Integer> getSlot(String key);
/**
* Find keys by key search pattern by one Redis call.
*
* Uses <code>KEYS</code> Redis command.
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return collection of keys
*/
Flowable<Collection<String>> findKeysByPattern(String pattern);
/**
* Get random key
*
* Uses <code>RANDOM_KEY</code> Redis command.
*
* @return random key
*/
Flowable<String> randomKey();
/**
* Delete multiple objects by a key pattern.
*
* Uses Lua script.
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return deleted objects amount
*/
Flowable<Long> deleteByPattern(String pattern);
/**
* Delete multiple objects by name.
*
* Uses <code>DEL</code> Redis command.
*
* @param keys - object names
* @return deleted objects amount
*/
Flowable<Long> delete(String ... keys);
/**
* Delete multiple objects by name.
* Actual removal will happen later asynchronously.
* <p>
* Requires Redis 4.0+
*
* @param keys of objects
* @return number of removed keys
*/
Flowable<Long> unlink(String ... keys);
/**
* Returns the number of keys in the currently-selected database
*
* @return count of keys
*/
Flowable<Long> count();
/**
* Delete all the keys of the currently selected database
*
* Uses <code>FLUSHDB</code> Redis command.
*
* @return void
*/
Flowable<Void> flushdb();
/**
* Delete all the keys of all the existing databases
*
* Uses <code>FLUSHALL</code> Redis command.
*
* @return void
*/
Flowable<Void> flushall();
}

@ -0,0 +1,236 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
/**
* <p>Map-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.</p>
*
* <p>Current redis implementation doesnt have map entry eviction functionality.
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.eviction.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RMapCacheRx<K, V> extends RMapRx<K, V> {
/**
* Sets max size of the map.
* Superfluous elements are evicted using LRU algorithm.
*
* @param maxSize - max size
* @return void
*/
Flowable<Void> setMaxSize(int maxSize);
/**
* Tries to set max size of the map.
* Superfluous elements are evicted using LRU algorithm.
*
* @param maxSize - max size
* @return <code>true</code> if max size has been successfully set, otherwise <code>false</code>.
*/
Flowable<Boolean> trySetMaxSize(int maxSize);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return previous associated value
*/
Flowable<V> putIfAbsent(K key, V value, long ttl, TimeUnit unit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Flowable<V> putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return previous associated value
*/
Flowable<V> put(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Flowable<V> put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Flowable<Boolean> fastPut(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Flowable<Boolean> fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* Works faster than usual {@link #putIfAbsent(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash
*/
Flowable<Boolean> fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Returns the number of entries in cache.
* This number can reflects expired entries too
* due to non realtime cleanup process.
*
*/
@Override
Flowable<Integer> size();
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Flowable<Long> remainTimeToLive(K key);
}

@ -0,0 +1,441 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import io.reactivex.Flowable;
/**
* map functions
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RMapRx<K, V> extends RExpirableRx {
/**
* Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
*
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution
* @return void
*/
Flowable<Void> loadAll(boolean replaceExistingValues, int parallelism);
/**
* Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined <code>keys</code> parameter.
*
* @param keys - map keys
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution
* @return void
*/
Flowable<Void> loadAll(Set<? extends K> keys, boolean replaceExistingValues, int parallelism);
/**
* Returns size of value mapped by key in bytes
*
* @param key - map key
* @return size of value
*/
Flowable<Integer> valueSize(K key);
/**
* Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation.
* <p>
* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined
* then value/values will be loaded in read-through mode.
* <p>
* The returned map is <b>NOT</b> backed by the original map.
*
* @param keys - map keys
* @return Map slice
*/
Flowable<Map<K, V>> getAll(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @return void
*/
Flowable<Void> putAll(Map<? extends K, ? extends V> map);
Flowable<V> addAndGet(K key, Number value);
Flowable<Boolean> containsValue(Object value);
Flowable<Boolean> containsKey(Object key);
Flowable<Integer> size();
/**
* Removes <code>keys</code> from map by one operation in async manner.
* <p>
* Works faster than <code>{@link #remove(Object, Object)}</code> but doesn't return
* the value associated with <code>key</code>.
* <p>
* If {@link MapWriter} is defined then <code>keys</code>are deleted in write-through mode.
*
* @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Flowable<Long> fastRemove(K ... keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
* <p>
* Works faster than <code>{@link #put(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Flowable<Boolean> fastPut(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* Works faster than <code>{@link #putIfAbsent(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and change hasn't been made.
*/
Flowable<Boolean> fastPutIfAbsent(K key, V value);
/**
* Read all keys at once
*
* @return keys
*/
Flowable<Set<K>> readAllKeySet();
/**
* Read all values at once
*
* @return values
*/
Flowable<Collection<V>> readAllValues();
/**
* Read all map entries at once
*
* @return entries
*/
Flowable<Set<Entry<K, V>>> readAllEntrySet();
/**
* Read all map as local instance at once
*
* @return map
*/
Flowable<Map<K, V>> readAllMap();
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
* <p>
* If map doesn't contain value for specified key and {@link MapLoader} is defined
* then value will be loaded in read-through mode.
*
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or
* {@code null} if this map contains no mapping for the key
*/
Flowable<V> get(K key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
*/
Flowable<V> put(K key, V value);
/**
* Removes <code>key</code> from map and returns associated value in async manner.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @return deleted value or <code>null</code> if there wasn't any association
*/
Flowable<V> remove(K key);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* If there wasn't any association before then method returns <code>null</code>.
* <p>
* If {@link MapWriter} is defined then new <code>value</code>is written in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
* or <code>null</code> if there wasn't any association and change hasn't been made
*/
Flowable<V> replace(K key, V value);
/**
* Replaces previous <code>oldValue</code> with a <code>newValue</code> associated with the <code>key</code>.
* If previous value doesn't exist or equal to <code>oldValue</code> then method returns <code>false</code>.
* <p>
* If {@link MapWriter} is defined then <code>newValue</code>is written in write-through mode.
*
* @param key - map key
* @param oldValue - map old value
* @param newValue - map new value
* @return <code>true</code> if value has been replaced otherwise <code>false</code>.
*/
Flowable<Boolean> replace(K key, V oldValue, V newValue);
/**
* Removes <code>key</code> from map only if it associated with <code>value</code>.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if map entry has been replaced otherwise <code>false</code>.
*/
Flowable<Boolean> remove(Object key, Object value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is a new one in the hash and value was set.
* Previous value if key already exists in the hash and change hasn't been made.
*/
Flowable<V> putIfAbsent(K key, V value);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllEntrySet()
*
* @return iterator
*/
Flowable<Map.Entry<K, V>> entryIterator();
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllEntrySet()
*
* @param count - size of entries batch
* @return iterator
*/
Flowable<Map.Entry<K, V>> entryIterator(int count);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param pattern - key pattern
* @return iterator
*/
Flowable<Map.Entry<K, V>> entryIterator(String pattern);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param pattern - key pattern
* @param count - size of entries batch
* @return iterator
*/
Flowable<Map.Entry<K, V>> entryIterator(String pattern, int count);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllValues()
*
* @return iterator
*/
Flowable<V> valueIterator();
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllValues()
*
* @param count - size of values batch
* @return iterator
*/
Flowable<V> valueIterator(int count);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param pattern - key pattern
* @return iterator
*/
Flowable<V> valueIterator(String pattern);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param pattern - key pattern
* @param count - size of values batch
* @return iterator
*/
Flowable<V> valueIterator(String pattern, int count);
/**
* Returns iterator over key set of this map.
* Keys are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllKeySet()
*
* @return iterator
*/
Flowable<K> keyIterator();
/**
* Returns iterator over key set of this map.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllKeySet()
*
* @param count - size of keys batch
* @return iterator
*/
Flowable<K> keyIterator(int count);
/**
* Returns iterator over key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @return iterator
*/
Flowable<K> keyIterator(String pattern);
/**
* Returns iterator over key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @param count - size of keys batch
* @return iterator
*/
Flowable<K> keyIterator(String pattern, int count);
}

@ -0,0 +1,159 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import io.reactivex.Flowable;
/**
* Base interface for all Redisson objects
*
* @author Nikita Koksharov
*
*/
public interface RObjectRx {
String getName();
Codec getCodec();
/**
* Restores object using its state returned by {@link #dump()} method.
*
* @param state - state of object
* @return void
*/
Flowable<Void> restore(byte[] state);
/**
* Restores object using its state returned by {@link #dump()} method and set time to live for it.
*
* @param state - state of object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
Flowable<Void> restore(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Restores and replaces object if it already exists.
*
* @param state - state of the object
* @return void
*/
Flowable<Void> restoreAndReplace(byte[] state);
/**
* Restores and replaces object if it already exists and set time to live for it.
*
* @param state - state of the object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
Flowable<Void> restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Returns dump of object
*
* @return dump
*/
Flowable<byte[]> dump();
/**
* Update the last access time of an object.
*
* @return <code>true</code> if object was touched else <code>false</code>
*/
Flowable<Boolean> touch();
/**
* Delete the objects.
* Actual removal will happen later asynchronously.
* <p>
* Requires Redis 4.0+
*
* @return <code>true</code> if it was exist and deleted else <code>false</code>
*/
Flowable<Boolean> unlink();
/**
* Copy object from source Redis instance to destination Redis instance
*
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
* @return void
*/
Flowable<Void> copy(String host, int port, int database, long timeout);
/**
* Transfer a object from a source Redis instance to a destination Redis instance
* in mode
*
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
* @return void
*/
Flowable<Void> migrate(String host, int port, int database, long timeout);
/**
* Move object to another database in mode
*
* @param database - number of Redis database
* @return <code>true</code> if key was moved <code>false</code> if not
*/
Flowable<Boolean> move(int database);
/**
* Delete object in mode
*
* @return <code>true</code> if object was deleted <code>false</code> if not
*/
Flowable<Boolean> delete();
/**
* Rename current object key to <code>newName</code>
* in mode
*
* @param newName - new name of object
* @return void
*/
Flowable<Void> rename(String newName);
/**
* Rename current object key to <code>newName</code>
* in mode only if new key is not exists
*
* @param newName - new name of object
* @return <code>true</code> if object has been renamed successfully and <code>false</code> otherwise
*/
Flowable<Boolean> renamenx(String newName);
/**
* Check object existence
*
* @return <code>true</code> if object exists and <code>false</code> otherwise
*/
Flowable<Boolean> isExists();
}

@ -0,0 +1,155 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Set;
import io.reactivex.Flowable;
/**
* Async set functions
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RSetRx<V> extends RCollectionRx<V>, RSortableRx<Set<V>> {
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Flowable<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Flowable<V> iterator(String pattern, int count);
/**
* Returns iterator over elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return iterator
*/
Flowable<V> iterator(String pattern);
/**
* Removes and returns random elements from set
* in async mode
*
* @param amount of random values
* @return random values
*/
Flowable<Set<V>> removeRandom(int amount);
/**
* Removes and returns random element from set
* in async mode
*
* @return value
*/
Flowable<V> removeRandom();
/**
* Returns random element from set
* in async mode
*
* @return value
*/
Flowable<V> random();
/**
* Move a member from this set to the given destination set in async mode.
*
* @param destination the destination set
* @param member the member to move
* @return true if the element is moved, false if the element is not a
* member of this set or no operation was performed
*/
Flowable<Boolean> move(String destination, V member);
/**
* Read all elements at once
*
* @return values
*/
Flowable<Set<V>> readAll();
/**
* Union sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names - name of sets
* @return size of union
*/
Flowable<Long> union(String... names);
/**
* Union sets specified by name with current set.
* Without current set state change.
*
* @param names - name of sets
* @return size of union
*/
Flowable<Set<V>> readUnion(String... names);
/**
* Diff sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names - name of sets
* @return size of diff
*/
Flowable<Long> diff(String... names);
/**
* Diff sets specified by name with current set.
* Without current set state change.
*
* @param names - name of sets
* @return values
*/
Flowable<Set<V>> readDiff(String... names);
/**
* Intersection sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names - name of sets
* @return size of intersection
*/
Flowable<Long> intersection(String... names);
/**
* Intersection sets specified by name with current set.
* Without current set state change.
*
* @param names - name of sets
* @return values
*/
Flowable<Set<V>> readIntersection(String... names);
}

@ -0,0 +1,159 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import io.reactivex.Single;
/**
*
* @author Nikita Koksharov
*
* @param <V> object type
*/
public interface RSortableRx<V> {
/**
* Read data in sorted view
*
* @param order for sorted data
* @return sorted collection
*/
Single<V> readSorted(SortOrder order);
/**
* Read data in sorted view
*
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
Single<V> readSorted(SortOrder order, int offset, int count);
/**
* Read data in sorted view
*
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @return sorted collection
*/
Single<V> readSorted(String byPattern, SortOrder order);
/**
* Read data in sorted view
*
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
Single<V> readSorted(String byPattern, SortOrder order, int offset, int count);
/**
* Read data in sorted view
*
* @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @return sorted collection
*/
<T> Single<Collection<T>> readSorted(String byPattern, List<String> getPatterns, SortOrder order);
/**
* Read data in sorted view
*
* @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
<T> Single<Collection<T>> readSorted(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param order for sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, String byPattern, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, String byPattern, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Single<Integer> sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
}

@ -23,7 +23,7 @@ import org.redisson.config.Config;
/**
* Main Redisson interface for access
* to all redisson objects with reactive interface.
* to all redisson objects with Reactive interface.
*
* @author Nikita Koksharov
*

@ -0,0 +1,670 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
/**
* Main Redisson interface for access
* to all redisson objects with RxJava2 interface.
*
* @author Nikita Koksharov
*
*/
public interface RedissonRxClient {
// /**
// * Returns stream instance by <code>name</code>
// * <p>
// * Requires <b>Redis 5.0.0 and higher.</b>
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name of stream
// * @return RStream object
// */
// <K, V> RStreamReactive<K, V> getStream(String name);
//
// /**
// * Returns stream instance by <code>name</code>
// * using provided <code>codec</code> for entries.
// * <p>
// * Requires <b>Redis 5.0.0 and higher.</b>
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name - name of stream
// * @param codec - codec for entry
// * @return RStream object
// */
// <K, V> RStreamReactive<K, V> getStream(String name, Codec codec);
//
// /**
// * Returns geospatial items holder instance by <code>name</code>.
// *
// * @param <V> type of value
// * @param name - name of object
// * @return Geo object
// */
// <V> RGeoReactive<V> getGeo(String name);
//
// /**
// * Returns geospatial items holder instance by <code>name</code>
// * using provided codec for geospatial members.
// *
// * @param <V> type of value
// * @param name - name of object
// * @param codec - codec for value
// * @return Geo object
// */
// <V> RGeoReactive<V> getGeo(String name, Codec codec);
//
// /**
// * Returns rate limiter instance by <code>name</code>
// *
// * @param name of rate limiter
// * @return RateLimiter object
// */
// RRateLimiterReactive getRateLimiter(String name);
//
// /**
// * Returns semaphore instance by name
// *
// * @param name - name of object
// * @return Semaphore object
// */
// RSemaphoreReactive getSemaphore(String name);
//
// /**
// * Returns semaphore instance by name.
// * Supports lease time parameter for each acquired permit.
// *
// * @param name - name of object
// * @return PermitExpirableSemaphore object
// */
// RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name);
//
// /**
// * Returns readWriteLock instance by name.
// *
// * @param name - name of object
// * @return Lock object
// */
// RReadWriteLockReactive getReadWriteLock(String name);
//
// /**
// * Returns lock instance by name.
// * <p>
// * Implements a <b>fair</b> locking so it guarantees an acquire order by threads.
// *
// * @param name - name of object
// * @return Lock object
// */
// RLockReactive getFairLock(String name);
//
// /**
// * Returns lock instance by name.
// * <p>
// * Implements a <b>non-fair</b> locking so doesn't guarantee an acquire order by threads.
// *
// * @param name - name of object
// * @return Lock object
// */
// RLockReactive getLock(String name);
//
// /**
// * Returns set-based cache instance by <code>name</code>.
// * Supports value eviction with a given TTL value.
// *
// * <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
// *
// * @param <V> type of values
// * @param name - name of object
// * @return SetCache object
// */
// <V> RSetCacheReactive<V> getSetCache(String name);
//
// /**
// * Returns set-based cache instance by <code>name</code>.
// * Supports value eviction with a given TTL value.
// *
// * <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - codec for values
// * @return SetCache object
// */
// <V> RSetCacheReactive<V> getSetCache(String name, Codec codec);
//
/**
* Returns map-based cache instance by name
* using provided codec for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
*
* @param <K> type of keys
* @param <V> type of values
* @param name - name of object
* @param codec - codec for values
* @return MapCache object
*/
<K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec);
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec, MapOptions)}.
*
* @param <K> type of key
* @param <V> type of value
* @param name - object name
* @param codec - codec for keys and values
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.
*
* @param <K> type of keys
* @param <V> type of values
* @param name - name of object
* @return MapCache object
*/
<K, V> RMapCacheRx<K, V> getMapCache(String name);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, MapOptions)}.</p>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCacheRx<K, V> getMapCache(String name, MapOptions<K, V> options);
/**
* Returns object holder instance by name
*
* @param <V> type of value
* @param name - name of object
* @return Bucket object
*/
<V> RBucketRx<V> getBucket(String name);
/**
* Returns object holder instance by name
* using provided codec for object.
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for value
* @return Bucket object
*/
<V> RBucketRx<V> getBucket(String name, Codec codec);
// /**
// * Returns a list of object holder instances by a key pattern
// *
// * @param <V> type of value
// * @param pattern - pattern for name of buckets
// * @return list of buckets
// */
// <V> List<RBucketReactive<V>> findBuckets(String pattern);
//
// /**
// * Returns HyperLogLog instance by name.
// *
// * @param <V> type of values
// * @param name - name of object
// * @return HyperLogLog object
// */
// <V> RHyperLogLogReactive<V> getHyperLogLog(String name);
//
// /**
// * Returns HyperLogLog instance by name
// * using provided codec for hll objects.
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - codec of values
// * @return HyperLogLog object
// */
// <V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec);
//
// /**
// * Returns list instance by name.
// *
// * @param <V> type of values
// * @param name - name of object
// * @return List object
// */
// <V> RListReactive<V> getList(String name);
//
// /**
// * Returns list instance by name
// * using provided codec for list objects.
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - codec for values
// * @return List object
// */
// <V> RListReactive<V> getList(String name, Codec codec);
//
// /**
// * Returns List based Multimap instance by name.
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name - name of object
// * @return ListMultimap object
// */
// <K, V> RListMultimapReactive<K, V> getListMultimap(String name);
//
// /**
// * Returns List based Multimap instance by name
// * using provided codec for both map keys and values.
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name - name of object
// * @param codec - codec for keys and values
// * @return RListMultimapReactive object
// */
// <K, V> RListMultimapReactive<K, V> getListMultimap(String name, Codec codec);
//
// /**
// * Returns Set based Multimap instance by name.
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name - name of object
// * @return SetMultimap object
// */
// <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name);
//
// /**
// * Returns Set based Multimap instance by name
// * using provided codec for both map keys and values.
// *
// * @param <K> type of key
// * @param <V> type of value
// * @param name - name of object
// * @param codec - codec for keys and values
// * @return SetMultimap object
// */
// <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec);
//
/**
* Returns map instance by name.
*
* @param <K> type of keys
* @param <V> type of values
* @param name - name of object
* @return Map object
*/
<K, V> RMapRx<K, V> getMap(String name);
/**
* Returns map instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return Map object
*/
<K, V> RMapRx<K, V> getMap(String name, MapOptions<K, V> options);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of keys
* @param <V> type of values
* @param name - name of object
* @param codec - codec for keys and values
* @return Map object
*/
<K, V> RMapRx<K, V> getMap(String name, Codec codec);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @param options - map options
* @return Map object
*/
<K, V> RMapRx<K, V> getMap(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns set instance by name.
*
* @param <V> type of values
* @param name - name of object
* @return Set object
*/
<V> RSetRx<V> getSet(String name);
/**
* Returns set instance by name
* using provided codec for set objects.
*
* @param <V> type of values
* @param name - name of set
* @param codec - codec for values
* @return Set object
*/
<V> RSetRx<V> getSet(String name, Codec codec);
// /**
// * Returns Redis Sorted Set instance by name.
// * This sorted set sorts objects by object score.
// *
// * @param <V> type of values
// * @param name of scored sorted set
// * @return ScoredSortedSet object
// */
// <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name);
//
// /**
// * Returns Redis Sorted Set instance by name
// * using provided codec for sorted set objects.
// * This sorted set sorts objects by object score.
// *
// * @param <V> type of values
// * @param name - name of scored sorted set
// * @param codec - codec for values
// * @return ScoredSortedSet object
// */
// <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec);
//
// /**
// * Returns String based Redis Sorted Set instance by name
// * All elements are inserted with the same score during addition,
// * in order to force lexicographical ordering
// *
// * @param name - name of object
// * @return LexSortedSet object
// */
// RLexSortedSetReactive getLexSortedSet(String name);
//
// /**
// * Returns topic instance by name.
// *
// * @param <M> type of message
// * @param name - name of object
// * @return Topic object
// */
// <M> RTopicReactive<M> getTopic(String name);
//
// /**
// * Returns topic instance by name
// * using provided codec for messages.
// *
// * @param <M> type of message
// * @param name - name of object
// * @param codec - codec for message
// * @return Topic object
// */
// <M> RTopicReactive<M> getTopic(String name, Codec codec);
//
// /**
// * Returns topic instance satisfies by pattern name.
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param <M> type of message
// * @param pattern of the topic
// * @return PatternTopic object
// */
// <M> RPatternTopicReactive<M> getPatternTopic(String pattern);
//
// /**
// * Returns topic instance satisfies by pattern name
// * using provided codec for messages.
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param <M> type of message
// * @param pattern of the topic
// * @param codec - codec for message
// * @return PatternTopic object
// */
// <M> RPatternTopicReactive<M> getPatternTopic(String pattern, Codec codec);
//
// /**
// * Returns queue instance by name.
// *
// * @param <V> type of values
// * @param name - name of object
// * @return Queue object
// */
// <V> RQueueReactive<V> getQueue(String name);
//
// /**
// * Returns queue instance by name
// * using provided codec for queue objects.
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - codec for values
// * @return Queue object
// */
// <V> RQueueReactive<V> getQueue(String name, Codec codec);
//
// /**
// * Returns blocking queue instance by name.
// *
// * @param <V> type of values
// * @param name - name of object
// * @return BlockingQueue object
// */
// <V> RBlockingQueueReactive<V> getBlockingQueue(String name);
//
// /**
// * Returns blocking queue instance by name
// * using provided codec for queue objects.
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - code for values
// * @return BlockingQueue object
// */
// <V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec);
//
// /**
// * Returns unbounded blocking deque instance by name.
// *
// * @param <V> type of value
// * @param name - name of object
// * @return BlockingDeque object
// */
// <V> RBlockingDequeReactive<V> getBlockingDeque(String name);
//
// /**
// * Returns unbounded blocking deque instance by name
// * using provided codec for deque objects.
// *
// * @param <V> type of value
// * @param name - name of object
// * @param codec - deque objects codec
// * @return BlockingDeque object
// */
// <V> RBlockingDequeReactive<V> getBlockingDeque(String name, Codec codec);
//
// /**
// * Returns deque instance by name.
// *
// * @param <V> type of values
// * @param name - name of object
// * @return Deque object
// */
// <V> RDequeReactive<V> getDeque(String name);
//
// /**
// * Returns deque instance by name
// * using provided codec for deque objects.
// *
// * @param <V> type of values
// * @param name - name of object
// * @param codec - coded for values
// * @return Deque object
// */
// <V> RDequeReactive<V> getDeque(String name, Codec codec);
//
// /**
// * Returns "atomic long" instance by name.
// *
// * @param name of the "atomic long"
// * @return AtomicLong object
// */
// RAtomicLongReactive getAtomicLong(String name);
//
// /**
// * Returns "atomic double" instance by name.
// *
// * @param name of the "atomic double"
// * @return AtomicLong object
// */
// RAtomicDoubleReactive getAtomicDouble(String name);
//
// /**
// * Returns bitSet instance by name.
// *
// * @param name - name of object
// * @return BitSet object
// */
// RBitSetReactive getBitSet(String name);
//
// /**
// * Returns script operations object
// *
// * @return Script object
// */
// RScriptReactive getScript();
//
// /**
// * Creates transaction with <b>READ_COMMITTED</b> isolation level.
// *
// * @param options - transaction configuration
// * @return Transaction object
// */
// RTransactionReactive createTransaction(TransactionOptions options);
//
// /**
// * Return batch object which executes group of
// * command in pipeline.
// *
// * See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
// *
// * @param options - batch configuration
// * @return Batch object
// */
// RBatchReactive createBatch(BatchOptions options);
//
// /*
// * Use createBatch(BatchOptions)
// */
// @Deprecated
// RBatchReactive createBatch();
/**
* Returns keys operations.
* Each of Redis/Redisson object associated with own key
*
* @return Keys object
*/
RKeysRx getKeys();
/**
* Shuts down Redisson instance <b>NOT</b> Redis server
*/
void shutdown();
/**
* Allows to get configuration provided
* during Redisson instance creation. Further changes on
* this object not affect Redisson instance.
*
* @return Config object
*/
Config getConfig();
/**
* Returns the CodecProvider instance
*
* @return CodecProvider object
*/
ReferenceCodecProvider getCodecProvider();
/**
* Get Redis nodes group for server operations
*
* @return NodesGroup object
*/
NodesGroup<Node> getNodesGroup();
/**
* Get Redis cluster nodes group for server operations
*
* @return NodesGroup object
*/
NodesGroup<ClusterNode> getClusterNodesGroup();
/**
* Returns {@code true} if this Redisson instance has been shut down.
*
* @return <code>true</code> if this Redisson instance has been shut down otherwise <code>false</code>
*/
boolean isShutdown();
/**
* Returns {@code true} if this Redisson instance was started to be shutdown
* or was shutdown {@link #isShutdown()} already.
*
* @return <code>true</code> if this Redisson instance was started to be shutdown
* or was shutdown {@link #isShutdown()} already otherwise <code>false</code>
*/
boolean isShuttingDown();
}

@ -521,7 +521,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
public <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect, final RFuture<RedisConnection> connFuture) {
if (mainPromise.isCancelled()) {

@ -152,7 +152,7 @@ public class CommandBatchService extends CommandAsyncService {
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
if (executed.get()) {
throw new IllegalStateException("Batch already has been executed!");

@ -62,7 +62,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture);
}

@ -0,0 +1,91 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise;
import io.reactivex.Flowable;
import reactor.rx.action.support.DefaultSubscriber;
/**
*
* @author Nikita Koksharov
*
*/
public class CommandRxBatchService extends CommandRxService {
private final CommandBatchService batchService;
private final Queue<Publisher<?>> publishers = new ConcurrentLinkedQueue<Publisher<?>>();
public CommandRxBatchService(ConnectionManager connectionManager) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager);
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
Flowable<R> flowable = super.flowable(supplier);
publishers.add(flowable);
return flowable;
}
public <R> Flowable<R> superReactive(Callable<RFuture<R>> supplier) {
return super.flowable(supplier);
}
@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture);
}
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
return batchService.executeAsync(options);
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
batchService.enableRedissonReferenceSupport(redissonReactive);
return super.enableRedissonReferenceSupport(redissonReactive);
}
}

@ -0,0 +1,34 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import io.reactivex.Flowable;
/**
*
* @author Nikita Koksharov
*
*/
public interface CommandRxExecutor extends CommandAsyncExecutor {
<R> Flowable<R> flowable(Callable<RFuture<R>> supplier);
}

@ -0,0 +1,66 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
*/
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
public CommandRxService(ConnectionManager connectionManager) {
super(connectionManager);
}
@Override
public <R> Flowable<R> flowable(final Callable<RFuture<R>> supplier) {
final ReplayProcessor<R> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
supplier.call().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
if (future.getNow() != null) {
p.onNext(future.getNow());
}
p.onComplete();
}
});
}
});
}
}

@ -0,0 +1,94 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.LongConsumer;
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public abstract class PublisherAdder<V> {
public abstract RFuture<Boolean> add(Object o);
public Flowable<Boolean> addAll(Publisher<? extends V> c) {
final Flowable<? extends V> cc = Flowable.fromPublisher(c);
final ReplayProcessor<Boolean> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
final AtomicBoolean completed = new AtomicBoolean();
final AtomicLong values = new AtomicLong();
final AtomicBoolean lastSize = new AtomicBoolean();
cc.subscribe(new Consumer<V>() {
@Override
public void accept(V t) throws Exception {
values.getAndIncrement();
add(t).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
if (future.getNow()) {
lastSize.set(true);
}
if (values.decrementAndGet() == 0 && completed.get()) {
p.onNext(lastSize.get());
p.onComplete();
}
}
});
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
p.onError(t);
}
}, new Action() {
@Override
public void run() throws Exception {
completed.set(true);
if (values.get() == 0) {
p.onNext(lastSize.get());
p.onComplete();
}
}
}, FlowableInternalHelper.RequestMax.INSTANCE);
}
});
}
}

@ -0,0 +1,126 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.RedissonKeys;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonKeysRx {
private final CommandRxExecutor commandExecutor;
private final RedissonKeys instance;
public RedissonKeysRx(CommandRxExecutor commandExecutor) {
instance = new RedissonKeys(commandExecutor);
this.commandExecutor = commandExecutor;
}
public Flowable<String> getKeysByPattern(String pattern) {
return getKeysByPattern(pattern, 10);
}
public Flowable<String> getKeysByPattern(String pattern, int count) {
List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
publishers.add(createKeysIterator(entry, pattern, count));
}
return Flowable.merge(publishers);
}
private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
final ReplayProcessor<String> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private RedisClient client;
private List<String> firstValues;
private long nextIterPos;
private long currentIndex;
@Override
public void accept(long value) {
currentIndex = value;
nextValues();
}
protected void nextValues() {
instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).addListener(new FutureListener<ListScanResult<Object>>() {
@Override
public void operationComplete(Future<ListScanResult<Object>> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
ListScanResult<Object> res = future.get();
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = (List<String>)(Object)res.getValues();
} else if (res.getValues().equals(firstValues)) {
p.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (Object val : res.getValues()) {
p.onNext((String)val);
currentIndex--;
if (currentIndex == 0) {
p.onComplete();
return;
}
}
if (nextIterPos == -1) {
p.onComplete();
currentIndex = 0;
}
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
}
}

@ -0,0 +1,97 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.Map;
import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCache;
/**
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonMapCacheRx<K, V> {
private final RedissonMapCache<K, V> instance;
public RedissonMapCacheRx(RedissonMapCache<K, V> instance) {
this.instance = instance;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
return entryIterator(null);
}
public Publisher<Map.Entry<K, V>> entryIterator(int count) {
return entryIterator(null, count);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, Map.Entry<K, V>>(instance, pattern, count).create();
}
public Publisher<V> valueIterator() {
return valueIterator(null);
}
public Publisher<V> valueIterator(String pattern) {
return valueIterator(pattern, 10);
}
public Publisher<V> valueIterator(int count) {
return valueIterator(null, count);
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, V>(instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
}.create();
}
public Publisher<K> keyIterator() {
return keyIterator(null);
}
public Publisher<K> keyIterator(String pattern) {
return keyIterator(pattern, 10);
}
public Publisher<K> keyIterator(int count) {
return keyIterator(null, count);
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, K>(instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
}.create();
}
}

@ -0,0 +1,99 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.Map;
import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
* and {@link java.util.Map}
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonMapRx<K, V> {
private final RedissonMap<K, V> instance;
public RedissonMapRx(RedissonMap<K, V> instance) {
this.instance = instance;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
return entryIterator(null);
}
public Publisher<Entry<K, V>> entryIterator(int count) {
return entryIterator(null, count);
}
public Publisher<Entry<K, V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, Map.Entry<K, V>>(instance, pattern, count).create();
}
public Publisher<V> valueIterator() {
return valueIterator(null);
}
public Publisher<V> valueIterator(String pattern) {
return valueIterator(pattern, 10);
}
public Publisher<V> valueIterator(int count) {
return valueIterator(null, count);
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, V>(instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
}.create();
}
public Publisher<K> keyIterator() {
return keyIterator(null);
}
public Publisher<K> keyIterator(String pattern) {
return keyIterator(pattern, 10);
}
public Publisher<K> keyIterator(int count) {
return keyIterator(null, count);
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, K>(instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
}.create();
}
}

@ -0,0 +1,137 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.AbstractMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.RedissonMap;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
* @param <M> entry type
*/
public class RedissonMapRxIterator<K, V, M> {
private final RedissonMap<K, V> map;
private final String pattern;
private final int count;
public RedissonMapRxIterator(RedissonMap<K, V> map, String pattern, int count) {
this.map = map;
this.pattern = pattern;
this.count = count;
}
public Flowable<M> create() {
final ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
public void accept(long value) throws Exception {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues();
completed = false;
}
};
protected void nextValues() {
map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener<MapScanResult<Object, Object>>() {
@Override
public void operationComplete(Future<MapScanResult<Object, Object>> future)
throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
MapScanResult<Object, Object> res = future.getNow();
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
M val = getValue(entry);
p.onNext(val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
p.onComplete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
p.onComplete();
}
if (finished || completed) {
return;
}
nextValues();
}
});
}
});
}
protected boolean tryAgain() {
return false;
}
M getValue(final Entry<Object, Object> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey(), (V)entry.getValue()) {
@Override
public V setValue(V value) {
return map.put((K) entry.getKey(), value);
}
};
}
}

@ -0,0 +1,72 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.reactivex.Flowable;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public class RedissonSetRx<V> {
private final RSet<V> instance;
public RedissonSetRx(RSet<V> instance) {
this.instance = instance;
}
public Flowable<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<Object>() {
@Override
public RFuture<Boolean> add(Object e) {
return instance.addAsync((V)e);
}
}.addAll(c);
}
public Flowable<V> iterator(int count) {
return iterator(null, count);
}
public Flowable<V> iterator(String pattern) {
return iterator(pattern, 10);
}
public Flowable<V> iterator(final String pattern, final int count) {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count);
}
}.create();
}
public Publisher<V> iterator() {
return iterator(null, 10);
}
}

@ -0,0 +1,137 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RFuture;
/**
*
* @author Nikita Koksharov
*
*/
public class RxProxyBuilder {
private static class CacheKey {
Method method;
Class<?> instanceClass;
public CacheKey(Method method, Class<?> instanceClass) {
super();
this.method = method;
this.instanceClass = instanceClass;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((instanceClass == null) ? 0 : instanceClass.hashCode());
result = prime * result + ((method == null) ? 0 : method.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheKey other = (CacheKey) obj;
if (instanceClass == null) {
if (other.instanceClass != null)
return false;
} else if (!instanceClass.equals(other.instanceClass))
return false;
if (method == null) {
if (other.method != null)
return false;
} else if (!method.equals(other.method))
return false;
return true;
}
}
private static final ConcurrentMap<CacheKey, Method> methodsMapping = new ConcurrentHashMap<CacheKey, Method>();
public static <T> T create(CommandRxExecutor commandExecutor, Object instance, Class<T> clazz) {
return create(commandExecutor, instance, null, clazz);
}
public static <T> T create(final CommandRxExecutor commandExecutor, final Object instance, final Object implementation, final Class<T> clazz) {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, final Object[] args) throws Throwable {
CacheKey key = new CacheKey(method, instance.getClass());
Method instanceMethod = methodsMapping.get(key);
if (instanceMethod == null) {
if (implementation != null) {
try {
instanceMethod = implementation.getClass().getMethod(method.getName(), method.getParameterTypes());
} catch (NoSuchMethodException e) {
try {
instanceMethod = instance.getClass().getMethod(method.getName() + "Async", method.getParameterTypes());
} catch (Exception e2) {
instanceMethod = instance.getClass().getMethod(method.getName(), method.getParameterTypes());
}
}
} else {
try {
instanceMethod = instance.getClass().getMethod(method.getName() + "Async", method.getParameterTypes());
} catch (NoSuchMethodException e) {
instanceMethod = instance.getClass().getMethod(method.getName(), method.getParameterTypes());
}
}
methodsMapping.put(key, instanceMethod);
}
final Method mm = instanceMethod;
if (instanceMethod.getName().endsWith("Async")) {
return commandExecutor.flowable(new Callable<RFuture<Object>>() {
@SuppressWarnings("unchecked")
@Override
public RFuture<Object> call() {
try {
return (RFuture<Object>) mm.invoke(instance, args);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
});
}
if (implementation != null
&& instanceMethod.getDeclaringClass() == implementation.getClass()) {
return instanceMethod.invoke(implementation, args);
}
return instanceMethod.invoke(instance, args);
}
};
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, handler);
}
}

@ -0,0 +1,110 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public abstract class SetRxIterator<V> {
public Flowable<V> create() {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues();
completed = false;
}
}
protected void nextValues() {
scanIterator(client, nextIterPos).addListener(new FutureListener<ListScanResult<Object>>() {
@Override
public void operationComplete(Future<ListScanResult<Object>> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
ListScanResult<Object> res = future.getNow();
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Object val : res.getValues()) {
p.onNext((V)val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
p.onComplete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
p.onComplete();
}
if (finished || completed) {
return;
}
nextValues();
}
});
}
});
}
protected boolean tryAgain() {
return false;
}
protected abstract RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
}

@ -0,0 +1,74 @@
package org.redisson.rx;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RCollectionRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.config.Config;
import io.reactivex.Flowable;
public abstract class BaseRxTest {
protected RedissonRxClient redisson;
protected static RedissonRxClient defaultRedisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
defaultRedisson.shutdown();
RedisRunner.shutDownDefaultRedisServerInstance();
}
@Before
public void before() throws IOException, InterruptedException {
if (redisson == null) {
redisson = defaultRedisson;
}
sync(redisson.getKeys().flushall());
}
// public static <V> Iterable<V> sync(RScoredSortedSetReactive<V> list) {
// return Streams.create(list.iterator()).toList().poll();
// }
public static <V> Iterable<V> sync(RCollectionRx<V> list) {
return list.iterator().toList().blockingGet();
}
public static <V> Iterator<V> toIterator(Publisher<V> pub) {
return Flowable.fromPublisher(pub).toList().blockingGet().iterator();
}
public static <V> Iterable<V> toIterable(Publisher<V> pub) {
return Flowable.fromPublisher(pub).toList().blockingGet();
}
public static <V> V sync(Flowable<V> ob) {
try {
return ob.blockingSingle();
} catch (NoSuchElementException e) {
return null;
}
}
public static RedissonRxClient createInstance() {
Config config = BaseTest.createConfig();
return Redisson.createRx(config);
}
}

@ -0,0 +1,64 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Iterator;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RBucketRx;
import org.redisson.api.RMapRx;
public class RedissonKeysRxTest extends BaseRxTest {
@Test
public void testKeysIterablePattern() {
sync(redisson.getBucket("test1").set("someValue"));
sync(redisson.getBucket("test2").set("someValue"));
sync(redisson.getBucket("test12").set("someValue"));
Iterator<String> iterator = toIterator(redisson.getKeys().getKeysByPattern("test?"));
for (; iterator.hasNext();) {
String key = iterator.next();
assertThat(key).isIn("test1", "test2");
}
}
@Test
public void testRandomKey() {
RBucketRx<String> bucket = redisson.getBucket("test1");
sync(bucket.set("someValue1"));
RBucketRx<String> bucket2 = redisson.getBucket("test2");
sync(bucket2.set("someValue2"));
assertThat(sync(redisson.getKeys().randomKey())).isIn("test1", "test2");
sync(redisson.getKeys().delete("test1"));
Assert.assertEquals("test2", sync(redisson.getKeys().randomKey()));
sync(redisson.getKeys().flushdb());
Assert.assertNull(sync(redisson.getKeys().randomKey()));
}
@Test
public void testDeleteByPattern() {
RBucketRx<String> bucket = redisson.getBucket("test1");
sync(bucket.set("someValue"));
RMapRx<String, String> map = redisson.getMap("test2");
sync(map.fastPut("1", "2"));
Assert.assertEquals(2, sync(redisson.getKeys().deleteByPattern("test?")).intValue());
}
@Test
public void testMassDelete() {
RBucketRx<String> bucket = redisson.getBucket("test");
sync(bucket.set("someValue"));
RMapRx<String, String> map = redisson.getMap("map2");
sync(map.fastPut("1", "2"));
Assert.assertEquals(2, sync(redisson.getKeys().delete("test", "map2")).intValue());
Assert.assertEquals(0, sync(redisson.getKeys().delete("test", "map2")).intValue());
}
}

@ -0,0 +1,428 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RMapCacheRx;
import org.redisson.api.RMapReactive;
import org.redisson.api.RMapRx;
import org.redisson.codec.MsgPackJacksonCodec;
public class RedissonMapCacheRxTest extends BaseRxTest {
public static class SimpleKey implements Serializable {
private String key;
public SimpleKey() {
}
public SimpleKey(String field) {
this.key = field;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
@Override
public String toString() {
return "key: " + key;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleKey other = (SimpleKey) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
return true;
}
}
public static class SimpleValue implements Serializable {
private String value;
public SimpleValue() {
}
public SimpleValue(String field) {
this.value = field;
}
public void setValue(String field) {
this.value = field;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "value: " + value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleValue other = (SimpleValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}
@Test
public void testGetAll() throws InterruptedException {
RMapCacheRx<Integer, Integer> map = redisson.getMapCache("getAll");
sync(map.put(1, 100));
sync(map.put(2, 200, 1, TimeUnit.SECONDS));
sync(map.put(3, 300, 1, TimeUnit.SECONDS));
sync(map.put(4, 400));
Map<Integer, Integer> filtered = sync(map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5))));
Map<Integer, Integer> expectedMap = new HashMap<Integer, Integer>();
expectedMap.put(2, 200);
expectedMap.put(3, 300);
Assert.assertEquals(expectedMap, filtered);
Thread.sleep(1000);
Map<Integer, Integer> filteredAgain = sync(map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5))));
Assert.assertTrue(filteredAgain.isEmpty());
}
@Test
public void testGetAllWithStringKeys() {
RMapCacheRx<String, Integer> map = redisson.getMapCache("getAllStrings");
sync(map.put("A", 100));
sync(map.put("B", 200));
sync(map.put("C", 300));
sync(map.put("D", 400));
Map<String, Integer> filtered = sync(map.getAll(new HashSet<String>(Arrays.asList("B", "C", "E"))));
Map<String, Integer> expectedMap = new HashMap<String, Integer>();
expectedMap.put("B", 200);
expectedMap.put("C", 300);
Assert.assertEquals(expectedMap, filtered);
}
@Test
public void testExpiredIterator() throws InterruptedException {
RMapCacheRx<String, String> cache = redisson.getMapCache("simple");
sync(cache.put("0", "8"));
sync(cache.put("1", "6", 1, TimeUnit.SECONDS));
sync(cache.put("2", "4", 3, TimeUnit.SECONDS));
sync(cache.put("3", "2", 4, TimeUnit.SECONDS));
sync(cache.put("4", "4", 1, TimeUnit.SECONDS));
Thread.sleep(1000);
assertThat(toIterator(cache.keyIterator())).containsOnly("0", "2", "3");
}
@Test
public void testExpire() throws InterruptedException {
RMapCacheRx<String, String> cache = redisson.getMapCache("simple");
sync(cache.put("0", "8", 1, TimeUnit.SECONDS));
sync(cache.expire(100, TimeUnit.MILLISECONDS));
Thread.sleep(500);
Assert.assertEquals(0, sync(cache.size()).intValue());
}
@Test
public void testExpireAt() throws InterruptedException {
RMapCacheRx<String, String> cache = redisson.getMapCache("simple");
sync(cache.put("0", "8", 1, TimeUnit.SECONDS));
sync(cache.expireAt(System.currentTimeMillis() + 100));
Thread.sleep(500);
Assert.assertEquals(0, sync(cache.size()).intValue());
}
@Test
public void testClearExpire() throws InterruptedException {
RMapCacheRx<String, String> cache = redisson.getMapCache("simple");
sync(cache.put("0", "8", 1, TimeUnit.SECONDS));
sync(cache.expireAt(System.currentTimeMillis() + 100));
sync(cache.clearExpire());
Thread.sleep(500);
Assert.assertEquals(1, sync(cache.size()).intValue());
}
@Test
public void testRemove() {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
sync(map.remove(new SimpleKey("33")));
sync(map.remove(new SimpleKey("5")));
Assert.assertEquals(1, sync(map.size()).intValue());
}
@Test
public void testPutAll() {
RMapCacheRx<Integer, String> map = redisson.getMapCache("simple");
sync(map.put(1, "1"));
sync(map.put(2, "2"));
sync(map.put(3, "3"));
Map<Integer, String> joinMap = new HashMap<Integer, String>();
joinMap.put(4, "4");
joinMap.put(5, "5");
joinMap.put(6, "6");
sync(map.putAll(joinMap));
assertThat(toIterable(map.keyIterator())).containsOnly(1, 2, 3, 4, 5, 6);
}
@Test
public void testContainsValue() throws InterruptedException {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple31", new MsgPackJacksonCodec());
Assert.assertFalse(sync(map.containsValue(new SimpleValue("34"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS));
Assert.assertTrue(sync(map.containsValue(new SimpleValue("44"))));
Assert.assertFalse(sync(map.containsValue(new SimpleValue("34"))));
Thread.sleep(1000);
Assert.assertFalse(sync(map.containsValue(new SimpleValue("44"))));
}
@Test
public void testContainsKey() throws InterruptedException {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS));
Assert.assertTrue(sync(map.containsKey(new SimpleKey("33"))));
Assert.assertFalse(sync(map.containsKey(new SimpleKey("34"))));
Thread.sleep(1000);
Assert.assertFalse(sync(map.containsKey(new SimpleKey("33"))));
}
@Test
public void testRemoveValue() {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2"), 1, TimeUnit.SECONDS));
boolean res = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertTrue(res);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertNull(val1);
Assert.assertEquals(0, sync(map.size()).intValue());
}
@Test
public void testScheduler() throws InterruptedException {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple", new MsgPackJacksonCodec());
Assert.assertNull(sync(map.get(new SimpleKey("33"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 5, TimeUnit.SECONDS));
Thread.sleep(11000);
Assert.assertEquals(0, sync(map.size()).intValue());
}
@Test
public void testPutGet() throws InterruptedException {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple01", new MsgPackJacksonCodec());
Assert.assertNull(sync(map.get(new SimpleKey("33"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS));
SimpleValue val1 = sync(map.get(new SimpleKey("33")));
Assert.assertEquals("44", val1.getValue());
Thread.sleep(1000);
Assert.assertEquals(1, sync(map.size()).intValue());
SimpleValue val2 = sync(map.get(new SimpleKey("33")));
Assert.assertEquals("44", val2.getValue());
Assert.assertEquals(1, sync(map.size()).intValue());
Thread.sleep(1000);
Assert.assertNull(sync(map.get(new SimpleKey("33"))));
}
@Test
public void testPutIfAbsent() throws Exception {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
sync(map.put(key, value));
Assert.assertEquals(value, sync(map.putIfAbsent(key, new SimpleValue("3"), 1, TimeUnit.SECONDS)));
Assert.assertEquals(value, sync(map.get(key)));
sync(map.putIfAbsent(new SimpleKey("4"), new SimpleValue("4"), 1, TimeUnit.SECONDS));
Assert.assertEquals(new SimpleValue("4"), sync(map.get(new SimpleKey("4"))));
Thread.sleep(1000);
Assert.assertNull(sync(map.get(new SimpleKey("4"))));
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
Assert.assertNull(sync(map.putIfAbsent(key1, value1, 2, TimeUnit.SECONDS)));
Assert.assertEquals(value1, sync(map.get(key1)));
}
@Test
public void testSize() {
RMapCacheRx<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("3"), new SimpleValue("4")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("3"), new SimpleValue("4")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("1"), new SimpleValue("21")));
sync(map.put(new SimpleKey("3"), new SimpleValue("41")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("51"), new SimpleValue("6")));
Assert.assertEquals(4, sync(map.size()).intValue());
sync(map.remove(new SimpleKey("3")));
Assert.assertEquals(3, sync(map.size()).intValue());
}
@Test
public void testEmptyRemove() {
RMapCacheRx<Integer, Integer> map = redisson.getMapCache("simple");
assertThat(sync(map.remove(1, 3))).isEqualTo(Boolean.FALSE);
sync(map.put(4, 5));
assertThat(sync(map.remove(4, 5))).isEqualTo(Boolean.TRUE);
}
@Test
public void testKeyIterator() {
RMapRx<Integer, Integer> map = redisson.getMapCache("simple");
sync(map.put(1, 0));
sync(map.put(3, 5));
sync(map.put(4, 6));
sync(map.put(7, 8));
List<Integer> keys = new ArrayList<Integer>(Arrays.asList(1, 3, 4, 7));
for (Iterator<Integer> iterator = toIterator(map.keyIterator()); iterator.hasNext();) {
Integer value = iterator.next();
if (!keys.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, keys.size());
}
@Test
public void testValueIterator() {
RMapRx<Integer, Integer> map = redisson.getMapCache("simple");
sync(map.put(1, 0));
sync(map.put(3, 5));
sync(map.put(4, 6));
sync(map.put(7, 8));
List<Integer> values = new ArrayList<Integer>(Arrays.asList(0, 5, 6, 8));
for (Iterator<Integer> iterator = toIterator(map.valueIterator()); iterator.hasNext();) {
Integer value = iterator.next();
if (!values.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, values.size());
}
@Test
public void testEquals() {
RMapCacheRx<String, String> map = redisson.getMapCache("simple");
sync(map.put("1", "7"));
sync(map.put("2", "4"));
sync(map.put("3", "5"));
Map<String, String> testMap = new HashMap<String, String>();
testMap.put("1", "7");
testMap.put("2", "4");
testMap.put("3", "5");
Assert.assertEquals(map, testMap);
Assert.assertEquals(testMap.hashCode(), map.hashCode());
}
}

@ -0,0 +1,517 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RMapRx;
public class RedissonMapRxTest extends BaseRxTest {
public static class SimpleKey implements Serializable {
private String key;
public SimpleKey() {
}
public SimpleKey(String field) {
this.key = field;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
@Override
public String toString() {
return "key: " + key;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleKey other = (SimpleKey) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
return true;
}
}
public static class SimpleValue implements Serializable {
private String value;
public SimpleValue() {
}
public SimpleValue(String field) {
this.value = field;
}
public void setValue(String field) {
this.value = field;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "value: " + value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleValue other = (SimpleValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}
@Test
public void testAddAndGet() throws InterruptedException {
RMapRx<Integer, Integer> map = redisson.getMap("getAll");
sync(map.put(1, 100));
Integer res = sync(map.addAndGet(1, 12));
Assert.assertEquals(112, (int)res);
res = sync(map.get(1));
Assert.assertEquals(112, (int)res);
RMapRx<Integer, Double> map2 = redisson.getMap("getAll2");
sync(map2.put(1, new Double(100.2)));
Double res2 = sync(map2.addAndGet(1, new Double(12.1)));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
res2 = sync(map2.get(1));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
RMapRx<String, Integer> mapStr = redisson.getMap("mapStr");
assertThat(sync(mapStr.put("1", 100))).isNull();
assertThat(sync(mapStr.addAndGet("1", 12))).isEqualTo(112);
assertThat(sync(mapStr.get("1"))).isEqualTo(112);
}
@Test
public void testGetAll() {
RMapRx<Integer, Integer> map = redisson.getMap("getAll");
sync(map.put(1, 100));
sync(map.put(2, 200));
sync(map.put(3, 300));
sync(map.put(4, 400));
Map<Integer, Integer> filtered = sync(map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5))));
Map<Integer, Integer> expectedMap = new HashMap<Integer, Integer>();
expectedMap.put(2, 200);
expectedMap.put(3, 300);
Assert.assertEquals(expectedMap, filtered);
}
@Test
public void testGetAllWithStringKeys() {
RMapRx<String, Integer> map = redisson.getMap("getAllStrings");
sync(map.put("A", 100));
sync(map.put("B", 200));
sync(map.put("C", 300));
sync(map.put("D", 400));
Map<String, Integer> filtered = sync(map.getAll(new HashSet<String>(Arrays.asList("B", "C", "E"))));
Map<String, Integer> expectedMap = new HashMap<String, Integer>();
expectedMap.put("B", 200);
expectedMap.put("C", 300);
Assert.assertEquals(expectedMap, filtered);
}
@Test
public void testInteger() {
RMapRx<Integer, Integer> map = redisson.getMap("test_int");
sync(map.put(1, 2));
sync(map.put(3, 4));
Assert.assertEquals(2, sync(map.size()).intValue());
Integer val = sync(map.get(1));
Assert.assertEquals(2, val.intValue());
Integer val2 = sync(map.get(3));
Assert.assertEquals(4, val2.intValue());
}
@Test
public void testLong() {
RMapRx<Long, Long> map = redisson.getMap("test_long");
sync(map.put(1L, 2L));
sync(map.put(3L, 4L));
Assert.assertEquals(2, sync(map.size()).intValue());
Long val = sync(map.get(1L));
Assert.assertEquals(2L, val.longValue());
Long val2 = sync(map.get(3L));
Assert.assertEquals(4L, val2.longValue());
}
@Test
public void testSimpleTypes() {
RMapRx<Integer, String> map = redisson.getMap("simple12");
sync(map.put(1, "12"));
sync(map.put(2, "33"));
sync(map.put(3, "43"));
String val = sync(map.get(2));
Assert.assertEquals("33", val);
}
@Test
public void testRemove() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
sync(map.remove(new SimpleKey("33")));
sync(map.remove(new SimpleKey("5")));
Assert.assertEquals(1, sync(map.size()).intValue());
}
@Test
public void testEquals() {
RMapRx<String, String> map = redisson.getMap("simple");
sync(map.put("1", "7"));
sync(map.put("2", "4"));
sync(map.put("3", "5"));
Map<String, String> testMap = new HashMap<String, String>();
testMap.put("1", "7");
testMap.put("2", "4");
testMap.put("3", "5");
Assert.assertEquals(map, testMap);
Assert.assertEquals(map.hashCode(), testMap.hashCode());
}
@Test
public void testPutAll() {
RMapRx<Integer, String> map = redisson.getMap("simple");
sync(map.put(1, "1"));
sync(map.put(2, "2"));
sync(map.put(3, "3"));
Map<Integer, String> joinMap = new HashMap<Integer, String>();
joinMap.put(4, "4");
joinMap.put(5, "5");
joinMap.put(6, "6");
sync(map.putAll(joinMap));
assertThat(toIterable(map.keyIterator())).contains(1, 2, 3, 4, 5, 6);
}
@Test
public void testContainsValue() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
Assert.assertTrue(sync(map.containsValue(new SimpleValue("2"))));
Assert.assertFalse(sync(map.containsValue(new SimpleValue("441"))));
Assert.assertFalse(sync(map.containsValue(new SimpleKey("5"))));
}
@Test
public void testContainsKey() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
Assert.assertTrue(sync(map.containsKey(new SimpleKey("33"))));
Assert.assertFalse(sync(map.containsKey(new SimpleKey("34"))));
}
@Test
public void testRemoveValue() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
boolean size = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertTrue(size);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertNull(val1);
Assert.assertEquals(0, sync(map.size()).intValue());
}
@Test
public void testRemoveValueFail() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
boolean removed = sync(map.remove(new SimpleKey("2"), new SimpleValue("1")));
Assert.assertFalse(removed);
boolean size2 = sync(map.remove(new SimpleKey("1"), new SimpleValue("3")));
Assert.assertFalse(size2);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertEquals("2", val1.getValue());
}
@Test
public void testReplaceOldValueFail() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
boolean res = sync(map.replace(new SimpleKey("1"), new SimpleValue("43"), new SimpleValue("31")));
Assert.assertFalse(res);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertEquals("2", val1.getValue());
}
@Test
public void testReplaceOldValueSuccess() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
boolean res = sync(map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3")));
Assert.assertTrue(res);
boolean res1 = sync(map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3")));
Assert.assertFalse(res1);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertEquals("3", val1.getValue());
}
@Test
public void testReplaceValue() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
SimpleValue res = sync(map.replace(new SimpleKey("1"), new SimpleValue("3")));
Assert.assertEquals("2", res.getValue());
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertEquals("3", val1.getValue());
}
@Test
public void testReplace() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
SimpleValue val1 = sync(map.get(new SimpleKey("33")));
Assert.assertEquals("44", val1.getValue());
sync(map.put(new SimpleKey("33"), new SimpleValue("abc")));
SimpleValue val2 = sync(map.get(new SimpleKey("33")));
Assert.assertEquals("abc", val2.getValue());
}
@Test
public void testPutGet() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("33"), new SimpleValue("44")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
SimpleValue val1 = sync(map.get(new SimpleKey("33")));
Assert.assertEquals("44", val1.getValue());
SimpleValue val2 = sync(map.get(new SimpleKey("5")));
Assert.assertEquals("6", val2.getValue());
}
@Test
public void testPutIfAbsent() throws Exception {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
sync(map.put(key, value));
Assert.assertEquals(value, sync(map.putIfAbsent(key, new SimpleValue("3"))));
Assert.assertEquals(value, sync(map.get(key)));
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
Assert.assertNull(sync(map.putIfAbsent(key1, value1)));
Assert.assertEquals(value1, sync(map.get(key1)));
}
@Test
public void testSize() {
RMapRx<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("3"), new SimpleValue("4")));
sync(map.put(new SimpleKey("5"), new SimpleValue("6")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
sync(map.put(new SimpleKey("3"), new SimpleValue("4")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("1"), new SimpleValue("21")));
sync(map.put(new SimpleKey("3"), new SimpleValue("41")));
Assert.assertEquals(3, sync(map.size()).intValue());
sync(map.put(new SimpleKey("51"), new SimpleValue("6")));
Assert.assertEquals(4, sync(map.size()).intValue());
sync(map.remove(new SimpleKey("3")));
Assert.assertEquals(3, sync(map.size()).intValue());
}
@Test
public void testEmptyRemove() {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
assertThat(sync(map.remove(1, 3))).isFalse();
sync(map.put(4, 5));
assertThat(sync(map.remove(4, 5))).isTrue();
}
@Test
public void testFastRemoveAsync() throws InterruptedException, ExecutionException {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
sync(map.put(1, 3));
sync(map.put(3, 5));
sync(map.put(4, 6));
sync(map.put(7, 8));
Assert.assertEquals((Long) 3L, sync(map.fastRemove(1, 3, 7)));
Assert.assertEquals(1, sync(map.size()).intValue());
}
@Test
public void testKeyIterator() {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
sync(map.put(1, 0));
sync(map.put(3, 5));
sync(map.put(4, 6));
sync(map.put(7, 8));
List<Integer> keys = new ArrayList<Integer>(Arrays.asList(1, 3, 4, 7));
for (Iterator<Integer> iterator = toIterator(map.keyIterator()); iterator.hasNext();) {
Integer value = iterator.next();
if (!keys.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, keys.size());
}
@Test
public void testValueIterator() {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
sync(map.put(1, 0));
sync(map.put(3, 5));
sync(map.put(4, 6));
sync(map.put(7, 8));
List<Integer> values = new ArrayList<Integer>(Arrays.asList(0, 5, 6, 8));
for (Iterator<Integer> iterator = toIterator(map.valueIterator()); iterator.hasNext();) {
Integer value = iterator.next();
if (!values.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, values.size());
}
@Test
public void testFastPut() throws Exception {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
Assert.assertTrue(sync(map.fastPut(1, 2)));
Assert.assertFalse(sync(map.fastPut(1, 3)));
Assert.assertEquals(1, sync(map.size()).intValue());
}
@Test
public void testFastRemoveEmpty() throws Exception {
RMapRx<Integer, Integer> map = redisson.getMap("simple");
sync(map.put(1, 3));
Assert.assertEquals(0, sync(map.fastRemove()).intValue());
Assert.assertEquals(1, sync(map.size()).intValue());
}
public static class SimpleObjectWithoutDefaultConstructor {
private String testField;
SimpleObjectWithoutDefaultConstructor(String testField) {
this.testField = testField;
}
public String getTestField() {
return testField;
}
public void setTestField(String testField) {
this.testField = testField;
}
}
}

@ -0,0 +1,270 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.TestObject;
import org.redisson.api.RSetRx;
public class RedissonSetRxTest extends BaseRxTest {
public static class SimpleBean implements Serializable {
private Long lng;
public Long getLng() {
return lng;
}
public void setLng(Long lng) {
this.lng = lng;
}
}
@Test
public void testAddAllReactive() {
RSetRx<Integer> list = redisson.getSet("set");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
RSetRx<Integer> list2 = redisson.getSet("set2");
Assert.assertEquals(true, sync(list2.addAll(list.iterator())));
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@Test
public void testRemoveRandom() {
RSetRx<Integer> set = redisson.getSet("simple");
sync(set.add(1));
sync(set.add(2));
sync(set.add(3));
assertThat(sync(set.removeRandom())).isIn(1, 2, 3);
assertThat(sync(set.removeRandom())).isIn(1, 2, 3);
assertThat(sync(set.removeRandom())).isIn(1, 2, 3);
Assert.assertNull(sync(set.removeRandom()));
}
@Test
public void testRandom() {
RSetRx<Integer> set = redisson.getSet("simple");
sync(set.add(1));
sync(set.add(2));
sync(set.add(3));
assertThat(sync(set.random())).isIn(1, 2, 3);
assertThat(sync(set.random())).isIn(1, 2, 3);
assertThat(sync(set.random())).isIn(1, 2, 3);
assertThat(sync(set)).containsOnly(1, 2, 3);
}
@Test
public void testAddBean() throws InterruptedException, ExecutionException {
SimpleBean sb = new SimpleBean();
sb.setLng(1L);
RSetRx<SimpleBean> set = redisson.getSet("simple");
sync(set.add(sb));
Assert.assertEquals(sb.getLng(), toIterator(set.iterator()).next().getLng());
}
@Test
public void testAddLong() throws InterruptedException, ExecutionException {
Long sb = 1l;
RSetRx<Long> set = redisson.getSet("simple_longs");
sync(set.add(sb));
for (Long l : sync(set)) {
Assert.assertEquals(sb.getClass(), l.getClass());
}
}
@Test
public void testRemove() throws InterruptedException, ExecutionException {
RSetRx<Integer> set = redisson.getSet("simple");
sync(set.add(1));
sync(set.add(3));
sync(set.add(7));
Assert.assertTrue(sync(set.remove(1)));
Assert.assertFalse(sync(set.contains(1)));
assertThat(sync(set)).containsExactly(3, 7);
Assert.assertFalse(sync(set.remove(1)));
assertThat(sync(set)).containsExactly(3, 7);
sync(set.remove(3));
Assert.assertFalse(sync(set.contains(3)));
assertThat(sync(set)).containsExactly(7);
}
@Test
public void testIteratorSequence() {
RSetRx<Long> set = redisson.getSet("set");
for (int i = 0; i < 1000; i++) {
sync(set.add(Long.valueOf(i)));
}
Set<Long> setCopy = new HashSet<Long>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Long.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(RSetRx<Long> set, Set<Long> setCopy) {
for (Iterator<Long> iterator = toIterator(set.iterator()); iterator.hasNext();) {
Long value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testLong() {
RSetRx<Long> set = redisson.getSet("set");
sync(set.add(1L));
sync(set.add(2L));
assertThat(sync(set)).containsOnly(1L, 2L);
}
@Test
public void testRetainAll() {
RSetRx<Integer> set = redisson.getSet("set");
for (int i = 0; i < 20000; i++) {
sync(set.add(i));
}
Assert.assertTrue(sync(set.retainAll(Arrays.asList(1, 2))));
assertThat(sync(set)).containsExactlyInAnyOrder(1, 2);
Assert.assertEquals(2, sync(set.size()).intValue());
}
@Test
public void testContainsAll() {
RSetRx<Integer> set = redisson.getSet("set");
for (int i = 0; i < 200; i++) {
sync(set.add(i));
}
Assert.assertTrue(sync(set.containsAll(Collections.emptyList())));
Assert.assertTrue(sync(set.containsAll(Arrays.asList(30, 11))));
Assert.assertFalse(sync(set.containsAll(Arrays.asList(30, 711, 11))));
}
@Test
public void testContains() {
RSetRx<TestObject> set = redisson.getSet("set");
sync(set.add(new TestObject("1", "2")));
sync(set.add(new TestObject("1", "2")));
sync(set.add(new TestObject("2", "3")));
sync(set.add(new TestObject("3", "4")));
sync(set.add(new TestObject("5", "6")));
Assert.assertTrue(sync(set.contains(new TestObject("2", "3"))));
Assert.assertTrue(sync(set.contains(new TestObject("1", "2"))));
Assert.assertFalse(sync(set.contains(new TestObject("1", "9"))));
}
@Test
public void testDuplicates() {
RSetRx<TestObject> set = redisson.getSet("set");
sync(set.add(new TestObject("1", "2")));
sync(set.add(new TestObject("1", "2")));
sync(set.add(new TestObject("2", "3")));
sync(set.add(new TestObject("3", "4")));
sync(set.add(new TestObject("5", "6")));
Assert.assertEquals(4, sync(set.size()).intValue());
}
@Test
public void testSize() {
RSetRx<Integer> set = redisson.getSet("set");
sync(set.add(1));
sync(set.add(2));
sync(set.add(3));
sync(set.add(3));
sync(set.add(4));
sync(set.add(5));
sync(set.add(5));
Assert.assertEquals(5, sync(set.size()).intValue());
}
@Test
public void testRetainAllEmpty() {
RSetRx<Integer> set = redisson.getSet("set");
sync(set.add(1));
sync(set.add(2));
sync(set.add(3));
sync(set.add(4));
sync(set.add(5));
Assert.assertTrue(sync(set.retainAll(Collections.<Integer>emptyList())));
Assert.assertEquals(0, sync(set.size()).intValue());
}
@Test
public void testRetainAllNoModify() {
RSetRx<Integer> set = redisson.getSet("set");
sync(set.add(1));
sync(set.add(2));
Assert.assertFalse(sync(set.retainAll(Arrays.asList(1, 2)))); // nothing changed
assertThat(sync(set)).containsExactly(1, 2);
}
@Test
public void testMove() throws Exception {
RSetRx<Integer> set = redisson.getSet("set");
RSetRx<Integer> otherSet = redisson.getSet("otherSet");
sync(set.add(1));
sync(set.add(2));
Assert.assertTrue(sync(set.move("otherSet", 1)));
Assert.assertEquals(1, sync(set.size()).intValue());
assertThat(sync(set)).containsExactly(2);
Assert.assertEquals(1, sync(otherSet.size()).intValue());
assertThat(sync(otherSet)).containsExactly(1);
}
@Test
public void testMoveNoMember() throws Exception {
RSetRx<Integer> set = redisson.getSet("set");
RSetRx<Integer> otherSet = redisson.getSet("otherSet");
sync(set.add(1));
Assert.assertFalse(sync(set.move("otherSet", 2)));
Assert.assertEquals(1, sync(set.size()).intValue());
Assert.assertEquals(0, sync(otherSet.size()).intValue());
}
}
Loading…
Cancel
Save