RedissonReactive.createTransaction method added.

parent 220370e47d
commit c3655e5ef4

@ -34,11 +34,11 @@ import org.redisson.command.CommandAsyncExecutor;
public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperLogLog<V> {
protected RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) {
public RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);

@ -52,7 +52,9 @@ import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.ReferenceCodecProvider;
@ -88,6 +90,7 @@ import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
* Main infrastructure class allows to get access
@ -411,5 +414,10 @@ public class RedissonReactive implements RedissonReactiveClient {
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapReactive<K, V>(codec, commandExecutor, name, options);
public RTransactionReactive createTransaction(TransactionOptions options) {
return new RedissonTransactionReactive(commandExecutor, options);

@ -566,4 +566,10 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonLock(commandExecutor, lockName);
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern) {
throw new UnsupportedOperationException();

@ -128,6 +128,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return get(f);
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();

@ -107,9 +107,9 @@ public class RedissonTopic<M> implements RTopic<M> {
public RFuture<Integer> addListenerAsync(final MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
final PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
RPromise<Integer> result = new RedissonPromise<Integer>();
final RPromise<Integer> result = new RedissonPromise<Integer>();
future.addListener(new FutureListener<PubSubConnectionEntry>() {
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {

@ -15,6 +15,7 @@
package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -28,6 +29,8 @@ public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern);
boolean remove(Object value);

@ -46,6 +46,8 @@ public interface RBucketReactive<V> extends RExpirableReactive {
Publisher<V> get();
Publisher<V> getAndDelete();
Publisher<Void> set(V value);
Publisher<Void> set(V value, long timeToLive, TimeUnit timeUnit);

@ -19,6 +19,12 @@ import java.util.Collection;
import org.reactivestreams.Publisher;
* @author Nikita Koksharov
* @param <V>
public interface RHyperLogLogReactive<V> extends RExpirableReactive {
Publisher<Boolean> add(V obj);

@ -16,11 +16,130 @@
package org.redisson.api;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
* @author Nikita Koksharov
public interface RKeysReactive {
* Move object to another database
* @param name of object
* @param database - Redis database number
* @return <code>true</code> if key was moved else <code>false</code>
Publisher<Boolean> move(String name, int database);
* Transfer object from source Redis instance to destination Redis instance
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
Publisher<Void> migrate(String name, String host, int port, int database, long timeout);
* Copy object from source Redis instance to destination Redis instance
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
Publisher<Void> copy(String name, String host, int port, int database, long timeout);
* Set a timeout for object. After the timeout has expired,
* the key will automatically be deleted.
* @param name of object
* @param timeToLive - timeout before object will be deleted
* @param timeUnit - timeout time unit
* @return <code>true</code> if the timeout was set and <code>false</code> if not
Publisher<Boolean> expire(String name, long timeToLive, TimeUnit timeUnit);
* Set an expire date for object. When expire date comes
* the key will automatically be deleted.
* @param name of object
* @param timestamp - expire date in milliseconds (Unix timestamp)
* @return <code>true</code> if the timeout was set and <code>false</code> if not
Publisher<Boolean> expireAt(String name, long timestamp);
* Clear an expire timeout or expire date for object.
* @param name of object
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
Publisher<Boolean> clearExpire(String name);
* Rename object with <code>oldName</code> to <code>newName</code>
* only if new key is not exists
* @param oldName - old name of object
* @param newName - new name of object
* @return <code>true</code> if object has been renamed successfully and <code>false</code> otherwise
Publisher<Boolean> renamenx(String oldName, String newName);
* Rename current object key to <code>newName</code>
* @param currentName - current name of object
* @param newName - new name of object
Publisher<Void> rename(String currentName, String newName);
* Remaining time to live of Redisson object that has a timeout
* @param name of key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
Publisher<Long> remainTimeToLive(String name);
* Update the last access time of an object.
* @param names of keys
* @return count of objects were touched
Publisher<Long> touch(String... names);
* Checks if provided keys exist
* @param names of keys
* @return amount of existing keys
Publisher<Long> countExists(String... names);
* Get Redis object type by key
* @param key - name of key
* @return type of key
Publisher<RType> getType(String key);
* Load keys in incrementally iterate mode.
@ -105,6 +224,24 @@ public interface RKeysReactive {
Publisher<Long> delete(String ... keys);
* Delete multiple objects by name.
* Actual removal will happen later asynchronously.
* <p>
* Requires Redis 4.0+
* @param keys of objects
* @return number of removed keys
Publisher<Long> unlink(String ... keys);
* Returns the number of keys in the currently-selected database
* @return count of keys
Publisher<Long> count();
* Delete all the keys of the currently selected database

@ -36,9 +36,10 @@ public interface RObjectReactive {
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
* @return void
Publisher<Void> migrate(String host, int port, int database);
Publisher<Void> migrate(String host, int port, int database, long timeout);
* Move object to another database in mode

@ -0,0 +1,157 @@
* Copyright 2018 Nikita Koksharov
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.redisson.api;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
* Transaction object allows to execute transactions over Redisson objects.
* Uses locks for write operations and maintains data modification operations list till the commit/rollback operation.
* <p>
* Transaction isolation level: <b>READ_COMMITTED</b>
* @author Nikita Koksharov
public interface RTransactionReactive {
* Returns transactional object holder instance by name.
* @param <V> type of value
* @param name - name of object
* @return Bucket object
<V> RBucketReactive<V> getBucket(String name);
* Returns transactional object holder instance by name
* using provided codec for object.
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return Bucket object
<V> RBucketReactive<V> getBucket(String name, Codec codec);
* Returns transactional map instance by name.
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return Map object
<K, V> RMapReactive<K, V> getMap(String name);
* Returns transactional map instance by name
* using provided codec for both map keys and values.
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @return Map object
<K, V> RMapReactive<K, V> getMap(String name, Codec codec);
* Returns transactional set instance by name.
* @param <V> type of value
* @param name - name of object
* @return Set object
<V> RSetReactive<V> getSet(String name);
* Returns transactional set instance by name
* using provided codec for set objects.
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return Set object
<V> RSetReactive<V> getSet(String name, Codec codec);
* Returns transactional set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String)}.</p>
* @param <V> type of value
* @param name - name of object
* @return SetCache object
<V> RSetCacheReactive<V> getSetCache(String name);
* Returns transactional set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return SetCache object
<V> RSetCacheReactive<V> getSetCache(String name, Codec codec);
* Returns transactional map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return MapCache object
<K, V> RMapCacheReactive<K, V> getMapCache(String name);
* Returns transactional map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
* @param <K> type of key
* @param <V> type of value
* @param name - object name
* @param codec - codec for keys and values
* @return MapCache object
<K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec);
* Commits all changes made on this transaction.
Publisher<Void> commit();
* Rollback all changes made on this transaction.
Publisher<Void> rollback();

@ -501,6 +501,14 @@ public interface RedissonReactiveClient {
RScriptReactive getScript();
* Creates transaction with <b>READ_COMMITTED</b> isolation level.
* @param options - transaction configuration
* @return Transaction object
RTransactionReactive createTransaction(TransactionOptions options);
* Return batch object which executes group of
* command in pipeline.

@ -35,10 +35,15 @@ public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive impl
private final RAtomicDoubleAsync instance;
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonAtomicDouble(commandExecutor, name);
this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name));
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;
public Publisher<Double> addAndGet(final double delta) {
return reactive(new Supplier<RFuture<Double>>() {

@ -35,8 +35,12 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
private final RAtomicLongAsync instance;
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonAtomicLong(commandExecutor, name);
this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name));
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;

@ -42,12 +42,12 @@ abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReact
private final RMultimap<K, V> instance;
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
super(commandExecutor, name, instance);
this.instance = instance;
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
super(codec, commandExecutor, name, instance);
this.instance = instance;

@ -19,6 +19,7 @@ import java.util.BitSet;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBitSet;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBitSetReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.BitSetCodec;
@ -35,11 +36,15 @@ import reactor.rx.Streams;
public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive {
private final RedissonBitSet instance;
private final RBitSetAsync instance;
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
this.instance = new RedissonBitSet(connectionManager, name);
this(connectionManager, name, new RedissonBitSet(connectionManager, name));
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
public Publisher<Boolean> get(final long bitIndex) {

@ -38,13 +38,21 @@ public class RedissonBucketReactive<V> extends RedissonExpirableReactive impleme
private final RBucketAsync<V> instance;
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
instance = new RedissonBucket<V>(connectionManager, name);
this(connectionManager, name, new RedissonBucket<V>(connectionManager, name));
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(connectionManager, name, instance);
this.instance = instance;
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name);
instance = new RedissonBucket<V>(codec, connectionManager, name);
this(codec, connectionManager, name, new RedissonBucket<V>(codec, connectionManager, name));
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(codec, connectionManager, name, instance);
this.instance = instance;
@ -57,6 +65,16 @@ public class RedissonBucketReactive<V> extends RedissonExpirableReactive impleme
public Publisher<V> getAndDelete() {
return reactive(new Supplier<RFuture<V>>() {
public RFuture<V> get() {
return instance.getAndDeleteAsync();
public Publisher<Void> set(final V value) {
return reactive(new Supplier<RFuture<Void>>() {

@ -19,12 +19,14 @@ import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RExpirableReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
* @author Nikita Koksharov
@ -32,22 +34,32 @@ import org.redisson.command.CommandReactiveExecutor;
abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive {
RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(connectionManager, name, instance);
RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name);
RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(codec, connectionManager, name, instance);
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive));
public Publisher<Boolean> expire(final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.expireAsync(timeToLive, timeUnit);
public Publisher<Boolean> expireAt(long timestamp) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, getName(), timestamp);
public Publisher<Boolean> expireAt(final long timestamp) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.expireAtAsync(timestamp);
@ -57,12 +69,22 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen
public Publisher<Boolean> clearExpire() {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.clearExpireAsync();
public Publisher<Long> remainTimeToLive() {
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.PTTL, getName());
return reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.remainTimeToLiveAsync();

@ -21,11 +21,16 @@ import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.RedissonHyperLogLog;
import org.redisson.api.RFuture;
import org.redisson.api.RHyperLogLogAsync;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
* @author Nikita Koksharov
@ -34,46 +39,66 @@ import org.redisson.command.CommandReactiveExecutor;
public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive implements RHyperLogLogReactive<V> {
private final RHyperLogLogAsync<V> instance;
public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
super(commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
super(codec, commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
public Publisher<Boolean> add(V obj) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), encode(obj));
public Publisher<Boolean> add(final V obj) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.addAsync(obj);
public Publisher<Boolean> addAll(Collection<V> objects) {
List<Object> args = new ArrayList<Object>(objects.size() + 1);
encode(args, objects);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), args.toArray());
public Publisher<Boolean> addAll(final Collection<V> objects) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.addAllAsync(objects);
public Publisher<Long> count() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, getName());
return reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.countAsync();
public Publisher<Long> countWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, args.toArray());
public Publisher<Long> countWith(final String... otherLogNames) {
return reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.countWithAsync(otherLogNames);
public Publisher<Void> mergeWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFMERGE, args.toArray());
public Publisher<Void> mergeWith(final String... otherLogNames) {
return reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.mergeWithAsync(otherLogNames);

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -25,6 +26,7 @@ import org.reactivestreams.Subscription;
import org.redisson.RedissonKeys;
import org.redisson.api.RFuture;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RType;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -220,4 +222,144 @@ public class RedissonKeysReactive implements RKeysReactive {
public Publisher<Boolean> move(final String name, final int database) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.moveAsync(name, database);
public Publisher<Void> migrate(final String name, final String host, final int port, final int database, final long timeout) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.migrateAsync(name, host, port, database, timeout);
public Publisher<Void> copy(final String name, final String host, final int port, final int database, final long timeout) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.copyAsync(name, host, port, database, timeout);
public Publisher<Boolean> expire(final String name, final long timeToLive, final TimeUnit timeUnit) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.expireAsync(name, timeToLive, timeUnit);
public Publisher<Boolean> expireAt(final String name, final long timestamp) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.expireAtAsync(name, timestamp);
public Publisher<Boolean> clearExpire(final String name) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.clearExpireAsync(name);
public Publisher<Boolean> renamenx(final String oldName, final String newName) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.renamenxAsync(oldName, newName);
public Publisher<Void> rename(final String currentName, final String newName) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.renameAsync(currentName, newName);
public Publisher<Long> remainTimeToLive(final String name) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.remainTimeToLiveAsync(name);
public Publisher<Long> touch(final String... names) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.touchAsync(names);
public Publisher<Long> countExists(final String... names) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.countExistsAsync(names);
public Publisher<RType> getType(final String key) {
return commandExecutor.reactive(new Supplier<RFuture<RType>>() {
public RFuture<RType> get() {
return instance.getTypeAsync(key);
public Publisher<Long> unlink(final String... keys) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.unlinkAsync(keys);
public Publisher<Long> count() {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.countAsync();

@ -29,6 +29,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonList;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -51,16 +52,16 @@ import reactor.rx.subscription.ReactiveSubscription;
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
private final RedissonList<V> instance;
private final RListAsync<V> instance;
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonList<V>(commandExecutor, name, null);
super(commandExecutor, name, new RedissonList<V>(commandExecutor, name, null));
this.instance = (RListAsync<V>) super.instance;
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonList<V>(codec, commandExecutor, name, null);
super(codec, commandExecutor, name, new RedissonList<V>(codec, commandExecutor, name, null));
this.instance = (RListAsync<V>) super.instance;
@ -299,7 +300,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.indexOfAsync(o, new LongReplayConvertor());
return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor());
@ -309,7 +310,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return reactive(new Supplier<RFuture<Long>>() {
public RFuture<Long> get() {
return instance.lastIndexOfAsync(o, new LongReplayConvertor());
return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor());

@ -22,7 +22,6 @@ import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
@ -37,12 +36,12 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R
private final RLockAsync instance;
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
instance = createLock(connectionManager, name);
this(connectionManager, name, new RedissonLock(connectionManager, name));
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return new RedissonLock(commandExecutor, name);
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;

@ -64,13 +64,21 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
private final RMapCacheAsync<K, V> mapCache;
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options);
this(commandExecutor, name, options, new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options));
public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapCacheAsync<K, V> mapCache) {
super(commandExecutor, name, mapCache);
this.mapCache = mapCache;
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options);
this(codec, commandExecutor, name, options, new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options));
public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapCacheAsync<K, V> mapCache) {
super(codec, commandExecutor, name, mapCache);
this.mapCache = mapCache;

@ -15,7 +15,6 @@
package org.redisson.reactive;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@ -55,13 +54,21 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
private final RMapAsync<K, V> instance;
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
this(commandExecutor, name, options, new RedissonMap<K, V>(commandExecutor, name, null, options));
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapAsync<K, V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
this(codec, commandExecutor, name, options, new RedissonMap<K, V>(codec, commandExecutor, name, null, options));
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapAsync<K, V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;

@ -20,10 +20,10 @@ import java.util.Collection;
import org.reactivestreams.Publisher;
import org.redisson.RedissonReference;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.RedissonObjectFactory;
@ -43,19 +43,21 @@ abstract class RedissonObjectReactive implements RObjectReactive {
final CommandReactiveExecutor commandExecutor;
private final String name;
final Codec codec;
protected RExpirableAsync instance;
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
this.codec = codec; = name;
this.commandExecutor = commandExecutor;
this.instance = instance;
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
return commandExecutor.reactive(supplier);
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance);
protected <V> Stream<V> newSucceeded(V result) {
@ -124,33 +126,63 @@ abstract class RedissonObjectReactive implements RObjectReactive {
public Publisher<Void> rename(String newName) {
return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName);
public Publisher<Void> rename(final String newName) {
return reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.renameAsync(newName);
public Publisher<Void> migrate(String host, int port, int database) {
return commandExecutor.writeReactive(getName(), RedisCommands.MIGRATE, host, port, getName(), database);
public Publisher<Void> migrate(final String host, final int port, final int database, final long timeout) {
return reactive(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return instance.migrateAsync(host, port, database, timeout);
public Publisher<Boolean> move(int database) {
return commandExecutor.writeReactive(getName(), RedisCommands.MOVE, getName(), database);
public Publisher<Boolean> move(final int database) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.moveAsync(database);
public Publisher<Boolean> renamenx(String newName) {
return commandExecutor.writeReactive(getName(), RedisCommands.RENAMENX, getName(), newName);
public Publisher<Boolean> renamenx(final String newName) {
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.renamenxAsync(newName);
public Publisher<Boolean> delete() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName());
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.deleteAsync();
public Publisher<Boolean> isExists() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.EXISTS, getName());
return reactive(new Supplier<RFuture<Boolean>>() {
public RFuture<Boolean> get() {
return instance.isExistsAsync();

@ -40,8 +40,8 @@ public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableR
private final RPermitExpirableSemaphoreAsync instance;
public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name);
instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub);
super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub));
instance = (RPermitExpirableSemaphoreAsync) super.instance;
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {

@ -16,11 +16,9 @@
package org.redisson.reactive;
import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
@ -33,28 +31,18 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp
private final RReadWriteLock instance;
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.instance = new RedissonReadWriteLock(commandExecutor, name);
super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name));
this.instance = (RReadWriteLock) super.instance;
public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName()) {
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.readLock();
return new RedissonLockReactive(commandExecutor, getName(), instance.readLock());
public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName()) {
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.writeLock();
return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock());

@ -15,7 +15,6 @@
package org.redisson.reactive;
import java.util.Collection;
import java.util.Map;
@ -47,13 +46,21 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
private final RScoredSortedSetAsync<V> instance;
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonScoredSortedSet<V>(commandExecutor, name, null);
this(commandExecutor, name, new RedissonScoredSortedSet<V>(commandExecutor, name, null));
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonScoredSortedSet<V>(codec, commandExecutor, name, null);
this(codec, commandExecutor, name, new RedissonScoredSortedSet<V>(codec, commandExecutor, name, null));
public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;

@ -37,8 +37,8 @@ public class RedissonSemaphoreReactive extends RedissonExpirableReactive impleme
private final RSemaphoreAsync instance;
public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name);
instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub));
instance = (RSemaphoreAsync) super.instance;

@ -15,7 +15,6 @@
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -24,7 +23,9 @@ import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RSetCacheReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
@ -59,18 +60,27 @@ import reactor.fn.Supplier;
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
private final RedissonSetCache<V> instance;
private final RSetCacheAsync<V> instance;
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
this(commandExecutor, name, new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null));
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
this(codec, commandExecutor, name, new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null));
public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName());
@ -90,7 +100,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
public RFuture<ListScanResult<ScanObjectEntry>> get() {
return instance.scanIteratorAsync(getName(), client, startPos, null);
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null);

@ -15,7 +15,6 @@
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -25,6 +24,7 @@ import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
@ -45,18 +45,27 @@ import reactor.fn.Supplier;
public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> {
private final RedissonSet<V> instance;
private final RSetAsync<V> instance;
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null);
this(commandExecutor, name, new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null));
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name, RSetAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonSet<V>(codec, commandExecutor, name, null);
this(codec, commandExecutor, name, new RedissonSet<V>(codec, commandExecutor, name, null));
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
public Publisher<Integer> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>(this).addAll(c);

@ -0,0 +1,119 @@
* Copyright 2018 Nikita Koksharov
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTransaction;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.transaction.RedissonTransaction;
import reactor.fn.Supplier;
* @author Nikita Koksharov
public class RedissonTransactionReactive implements RTransactionReactive {
private final RTransaction transaction;
private final CommandReactiveExecutor executorService;
public RedissonTransactionReactive(CommandReactiveExecutor executorService, TransactionOptions options) {
this.transaction = new RedissonTransaction(executorService, options);
this.executorService = executorService;
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(executorService, name, transaction.<V>getBucket(name));
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, executorService, name, transaction.<V>getBucket(name, codec));
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(executorService, name, null, transaction.<K, V>getMap(name));
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
return new RedissonMapReactive<K, V>(codec, executorService, name, null, transaction.<K, V>getMap(name, codec));
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, executorService, name, null, transaction.<K, V>getMapCache(name, codec));
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(executorService, name, null, transaction.<K, V>getMapCache(name));
public <V> RSetReactive<V> getSet(String name) {
return new RedissonSetReactive<V>(executorService, name, transaction.<V>getSet(name));
public <V> RSetReactive<V> getSet(String name, Codec codec) {
return new RedissonSetReactive<V>(codec, executorService, name, transaction.<V>getSet(name, codec));
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(executorService, name, transaction.<V>getSetCache(name));
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, executorService, name, transaction.<V>getSetCache(name, codec));
public Publisher<Void> commit() {
return new NettyFuturePublisher<Void>(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return transaction.commitAsync();
public Publisher<Void> rollback() {
return new NettyFuturePublisher<Void>(new Supplier<RFuture<Void>>() {
public RFuture<Void> get() {
return transaction.rollbackAsync();

@ -510,7 +510,7 @@ public class BaseTransactionalMap<K, V> {
executeLocked(result, new Runnable() {
public void run() {
AtomicLong counter = new AtomicLong();
final AtomicLong counter = new AtomicLong();
List<K> keyList = Arrays.asList(keys);
for (Iterator<K> iterator = keyList.iterator(); iterator.hasNext();) {
K key =;

@ -1,3 +1,18 @@
* Copyright 2018 Nikita Koksharov
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.redisson.transaction;
import org.redisson.client.codec.Codec;

@ -1,3 +1,18 @@
* Copyright 2018 Nikita Koksharov
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.redisson.transaction;
import java.util.ArrayList;

@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -176,13 +175,14 @@ public class RedissonTransaction implements RTransaction {
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
for (TransactionalOperation transactionalOperation : operations) {
System.out.println("transactionalOperation " + transactionalOperation);
String id = generateId();
RPromise<Void> result = new RedissonPromise<Void>();
final String id = generateId();
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id);
future.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@ -192,7 +192,7 @@ public class RedissonTransaction implements RTransaction {
Map<HashKey, HashValue> hashes = future.getNow();
final Map<HashKey, HashValue> hashes = future.getNow();
try {
} catch (TransactionTimeoutException e) {
@ -420,13 +420,13 @@ public class RedissonTransaction implements RTransaction {
return hashes;
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(String requestId) {
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(final String requestId) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
return RedissonPromise.newSucceededFuture(Collections.<HashKey, HashValue>emptyMap());
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<Map<HashKey, HashValue>>();
Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
final RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<Map<HashKey, HashValue>>();
final Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
@ -459,14 +459,14 @@ public class RedissonTransaction implements RTransaction {
CountableListener<Map<HashKey, HashValue>> listener =
final CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes);
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
final List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
@ -588,7 +588,7 @@ public class RedissonTransaction implements RTransaction {
RPromise<Void> result = new RedissonPromise<Void>();
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Object> future = executorService.executeAsync(BatchOptions.defaults());
future.addListener(new FutureListener<Object>() {

@ -0,0 +1,148 @@
package org.redisson.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.BaseReactiveTest;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions;
public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest {
public void testTimeout() throws InterruptedException {
RBucketReactive<String> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(3, TimeUnit.SECONDS));
RBucketReactive<String> bucket = transaction.getBucket("test");
try {
} catch (TransactionException e) {
// skip
public void testSet() {
RBucketReactive<String> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
public void testGetAndSet() {
RBucketReactive<String> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
public void testCompareAndSet() {
RBucketReactive<String> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
assertThat(sync(bucket.compareAndSet("0", "434"))).isFalse();
assertThat(sync(bucket.compareAndSet("123", "232"))).isTrue();
public void testTrySet() {
RBucketReactive<String> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
public void testGetAndRemove() {
RBucketReactive<String> m = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> set = transaction.getBucket("test");
public void testRollback() {
RBucketReactive<Object> b = redisson.getBucket("test");
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<Object> bucket = transaction.getBucket("test");