Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java
pull/1821/head
Nikita 7 years ago
commit 804b158229

@ -1,6 +1,6 @@
Redisson: Redis based In-Memory Data Grid for Java. Redisson: Redis based In-Memory Data Grid for Java.
==== ====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.5.7) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.

@ -34,11 +34,11 @@ import org.redisson.command.CommandAsyncExecutor;
*/ */
public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperLogLog<V> { public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperLogLog<V> {
protected RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) { public RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
} }
protected RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) { public RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
} }

@ -52,7 +52,9 @@ import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive; import org.redisson.api.RTopicReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.ReferenceCodecProvider; import org.redisson.codec.ReferenceCodecProvider;
@ -88,6 +90,7 @@ import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive; import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
/** /**
* Main infrastructure class allows to get access * Main infrastructure class allows to get access
@ -411,5 +414,10 @@ public class RedissonReactive implements RedissonReactiveClient {
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) { public <K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapReactive<K, V>(codec, commandExecutor, name, options); return new RedissonMapReactive<K, V>(codec, commandExecutor, name, options);
} }
@Override
public RTransactionReactive createTransaction(TransactionOptions options) {
return new RedissonTransactionReactive(commandExecutor, options);
}
} }

@ -565,5 +565,11 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
String lockName = getLockName(value); String lockName = getLockName(value);
return new RedissonLock(commandExecutor, lockName); return new RedissonLock(commandExecutor, lockName);
} }
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern) {
throw new UnsupportedOperationException();
}
} }

@ -128,6 +128,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return get(f); return get(f);
} }
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(startPos); params.add(startPos);

@ -104,6 +104,25 @@ public class RedissonTopic<M> implements RTopic<M> {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name); PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
return addListener(pubSubListener); return addListener(pubSubListener);
} }
@Override
public RFuture<Integer> addListenerAsync(final MessageListener<M> listener) {
final PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
final RPromise<Integer> result = new RedissonPromise<Integer>();
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(System.identityHashCode(pubSubListener));
}
});
return result;
}
private int addListener(RedisPubSubListener<?> pubSubListener) { private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener); RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -28,6 +29,8 @@ public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern); ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern);
boolean remove(Object value); boolean remove(Object value);
} }

@ -45,6 +45,8 @@ public interface RBucketReactive<V> extends RExpirableReactive {
Publisher<V> getAndSet(V newValue); Publisher<V> getAndSet(V newValue);
Publisher<V> get(); Publisher<V> get();
Publisher<V> getAndDelete();
Publisher<Void> set(V value); Publisher<Void> set(V value);

@ -19,6 +19,12 @@ import java.util.Collection;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
* @param <V>
*/
public interface RHyperLogLogReactive<V> extends RExpirableReactive { public interface RHyperLogLogReactive<V> extends RExpirableReactive {
Publisher<Boolean> add(V obj); Publisher<Boolean> add(V obj);

@ -16,11 +16,130 @@
package org.redisson.api; package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RKeysReactive { public interface RKeysReactive {
/**
* Move object to another database
*
* @param name of object
* @param database - Redis database number
* @return <code>true</code> if key was moved else <code>false</code>
*/
Publisher<Boolean> move(String name, int database);
/**
* Transfer object from source Redis instance to destination Redis instance
*
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
*/
Publisher<Void> migrate(String name, String host, int port, int database, long timeout);
/**
* Copy object from source Redis instance to destination Redis instance
*
* @param name of object
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
*/
Publisher<Void> copy(String name, String host, int port, int database, long timeout);
/**
* Set a timeout for object. After the timeout has expired,
* the key will automatically be deleted.
*
* @param name of object
* @param timeToLive - timeout before object will be deleted
* @param timeUnit - timeout time unit
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Publisher<Boolean> expire(String name, long timeToLive, TimeUnit timeUnit);
/**
* Set an expire date for object. When expire date comes
* the key will automatically be deleted.
*
* @param name of object
* @param timestamp - expire date in milliseconds (Unix timestamp)
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Publisher<Boolean> expireAt(String name, long timestamp);
/**
* Clear an expire timeout or expire date for object.
*
* @param name of object
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
*/
Publisher<Boolean> clearExpire(String name);
/**
* Rename object with <code>oldName</code> to <code>newName</code>
* only if new key is not exists
*
* @param oldName - old name of object
* @param newName - new name of object
* @return <code>true</code> if object has been renamed successfully and <code>false</code> otherwise
*/
Publisher<Boolean> renamenx(String oldName, String newName);
/**
* Rename current object key to <code>newName</code>
*
* @param currentName - current name of object
* @param newName - new name of object
*/
Publisher<Void> rename(String currentName, String newName);
/**
* Remaining time to live of Redisson object that has a timeout
*
* @param name of key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Publisher<Long> remainTimeToLive(String name);
/**
* Update the last access time of an object.
*
* @param names of keys
* @return count of objects were touched
*/
Publisher<Long> touch(String... names);
/**
* Checks if provided keys exist
*
* @param names of keys
* @return amount of existing keys
*/
Publisher<Long> countExists(String... names);
/**
* Get Redis object type by key
*
* @param key - name of key
* @return type of key
*/
Publisher<RType> getType(String key);
/** /**
* Load keys in incrementally iterate mode. * Load keys in incrementally iterate mode.
* *
@ -105,6 +224,24 @@ public interface RKeysReactive {
*/ */
Publisher<Long> delete(String ... keys); Publisher<Long> delete(String ... keys);
/**
* Delete multiple objects by name.
* Actual removal will happen later asynchronously.
* <p>
* Requires Redis 4.0+
*
* @param keys of objects
* @return number of removed keys
*/
Publisher<Long> unlink(String ... keys);
/**
* Returns the number of keys in the currently-selected database
*
* @return count of keys
*/
Publisher<Long> count();
/** /**
* Delete all the keys of the currently selected database * Delete all the keys of the currently selected database
* *

@ -36,9 +36,10 @@ public interface RObjectReactive {
* @param host - destination host * @param host - destination host
* @param port - destination port * @param port - destination port
* @param database - destination database * @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
* @return void * @return void
*/ */
Publisher<Void> migrate(String host, int port, int database); Publisher<Void> migrate(String host, int port, int database, long timeout);
/** /**
* Move object to another database in mode * Move object to another database in mode

@ -15,6 +15,8 @@
*/ */
package org.redisson.api; package org.redisson.api;
import org.redisson.api.listener.MessageListener;
/** /**
* Distributed topic. Messages are delivered to all message listeners across Redis cluster. * Distributed topic. Messages are delivered to all message listeners across Redis cluster.
* *
@ -28,8 +30,19 @@ public interface RTopicAsync<M> {
* Publish the message to all subscribers of this topic asynchronously * Publish the message to all subscribers of this topic asynchronously
* *
* @param message to send * @param message to send
* @return the <code>RFuture</code> object with number of clients that received the message * @return number of clients that received the message
*/ */
RFuture<Long> publishAsync(M message); RFuture<Long> publishAsync(M message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
RFuture<Integer> addListenerAsync(MessageListener<M> listener);
} }

@ -157,10 +157,20 @@ public interface RTransaction {
* Commits all changes made on this transaction. * Commits all changes made on this transaction.
*/ */
void commit(); void commit();
/**
* Commits all changes made on this transaction in async mode.
*/
RFuture<Void> commitAsync();
/** /**
* Rollback all changes made on this transaction. * Rollback all changes made on this transaction.
*/ */
void rollback(); void rollback();
/**
* Rollback all changes made on this transaction in async mode.
*/
RFuture<Void> rollbackAsync();
} }

@ -0,0 +1,157 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
/**
* Transaction object allows to execute transactions over Redisson objects.
* Uses locks for write operations and maintains data modification operations list till the commit/rollback operation.
* <p>
* Transaction isolation level: <b>READ_COMMITTED</b>
*
* @author Nikita Koksharov
*
*/
public interface RTransactionReactive {
/**
* Returns transactional object holder instance by name.
*
* @param <V> type of value
* @param name - name of object
* @return Bucket object
*/
<V> RBucketReactive<V> getBucket(String name);
/**
* Returns transactional object holder instance by name
* using provided codec for object.
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return Bucket object
*/
<V> RBucketReactive<V> getBucket(String name, Codec codec);
/**
* Returns transactional map instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return Map object
*/
<K, V> RMapReactive<K, V> getMap(String name);
/**
* Returns transactional map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @return Map object
*/
<K, V> RMapReactive<K, V> getMap(String name, Codec codec);
/**
* Returns transactional set instance by name.
*
* @param <V> type of value
* @param name - name of object
* @return Set object
*/
<V> RSetReactive<V> getSet(String name);
/**
* Returns transactional set instance by name
* using provided codec for set objects.
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return Set object
*/
<V> RSetReactive<V> getSet(String name, Codec codec);
/**
* Returns transactional set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String)}.</p>
*
* @param <V> type of value
* @param name - name of object
* @return SetCache object
*/
<V> RSetCacheReactive<V> getSetCache(String name);
/**
* Returns transactional set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for values
* @return SetCache object
*/
<V> RSetCacheReactive<V> getSetCache(String name, Codec codec);
/**
* Returns transactional map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return MapCache object
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name);
/**
* Returns transactional map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
*
* @param <K> type of key
* @param <V> type of value
* @param name - object name
* @param codec - codec for keys and values
* @return MapCache object
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec);
/**
* Commits all changes made on this transaction.
*/
Publisher<Void> commit();
/**
* Rollback all changes made on this transaction.
*/
Publisher<Void> rollback();
}

@ -501,6 +501,14 @@ public interface RedissonReactiveClient {
*/ */
RScriptReactive getScript(); RScriptReactive getScript();
/**
* Creates transaction with <b>READ_COMMITTED</b> isolation level.
*
* @param options - transaction configuration
* @return Transaction object
*/
RTransactionReactive createTransaction(TransactionOptions options);
/** /**
* Return batch object which executes group of * Return batch object which executes group of
* command in pipeline. * command in pipeline.

@ -132,7 +132,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/** /**
* Redis Slave node is excluded from the internal list of available nodes * Redis Slave node failing to execute commands is excluded from the internal list of available nodes
* when the time interval from the moment of first Redis command execution failure * when the time interval from the moment of first Redis command execution failure
* on this server reaches <code>slaveFailsInterval</code> value. * on this server reaches <code>slaveFailsInterval</code> value.
* <p> * <p>

@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener; import org.redisson.misc.TransferListener;

@ -44,6 +44,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.SentinelServersConfig; import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;

@ -33,10 +33,10 @@ import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.CountableListener;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;

@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.misc;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -51,6 +49,15 @@ public class CountableListener<T> implements FutureListener<Object> {
counter.incrementAndGet(); counter.incrementAndGet();
} }
public void decCounter() {
if (counter.decrementAndGet() == 0) {
onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
}
}
@Override @Override
public void operationComplete(Future<Object> future) throws Exception { public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -59,13 +66,8 @@ public class CountableListener<T> implements FutureListener<Object> {
} }
return; return;
} }
if (counter.decrementAndGet() == 0) { decCounter();
onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
}
} }
protected void onSuccess(T value) { protected void onSuccess(T value) {

@ -35,9 +35,14 @@ public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive impl
private final RAtomicDoubleAsync instance; private final RAtomicDoubleAsync instance;
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name));
instance = new RedissonAtomicDouble(commandExecutor, name);
} }
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
@Override @Override
public Publisher<Double> addAndGet(final double delta) { public Publisher<Double> addAndGet(final double delta) {

@ -36,10 +36,14 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
private final RAtomicLongAsync instance; private final RAtomicLongAsync instance;
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name));
instance = new RedissonAtomicLong(commandExecutor, name);
} }
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
@Override @Override
public Publisher<Long> addAndGet(final long delta) { public Publisher<Long> addAndGet(final long delta) {
return reactive(new Supplier<RFuture<Long>>() { return reactive(new Supplier<RFuture<Long>>() {

@ -42,12 +42,12 @@ abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReact
private final RMultimap<K, V> instance; private final RMultimap<K, V> instance;
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) { public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name, instance);
this.instance = instance; this.instance = instance;
} }
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonBaseMultimapReactive(RMultimap<K, V> instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name, instance);
this.instance = instance; this.instance = instance;
} }

@ -20,6 +20,7 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonBitSet; import org.redisson.RedissonBitSet;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBitSetReactive; import org.redisson.api.RBitSetReactive;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.BitSetCodec; import org.redisson.client.codec.BitSetCodec;
@ -35,11 +36,15 @@ import reactor.core.publisher.Mono;
*/ */
public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive { public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive {
private final RedissonBitSet instance; private final RBitSetAsync instance;
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) { public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name); this(connectionManager, name, new RedissonBitSet(connectionManager, name));
this.instance = new RedissonBitSet(connectionManager, name); }
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
} }
public Publisher<Boolean> get(final long bitIndex) { public Publisher<Boolean> get(final long bitIndex) {

@ -37,13 +37,21 @@ public class RedissonBucketReactive<V> extends RedissonExpirableReactive impleme
private final RBucketAsync<V> instance; private final RBucketAsync<V> instance;
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) { public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name); this(connectionManager, name, new RedissonBucket<V>(connectionManager, name));
instance = new RedissonBucket<V>(connectionManager, name); }
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(connectionManager, name, instance);
this.instance = instance;
} }
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name); this(codec, connectionManager, name, new RedissonBucket<V>(codec, connectionManager, name));
instance = new RedissonBucket<V>(codec, connectionManager, name); }
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(codec, connectionManager, name, instance);
this.instance = instance;
} }
@Override @Override
@ -55,6 +63,16 @@ public class RedissonBucketReactive<V> extends RedissonExpirableReactive impleme
} }
}); });
} }
@Override
public Publisher<V> getAndDelete() {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAndDeleteAsync();
}
});
}
@Override @Override
public Publisher<Void> set(final V value) { public Publisher<Void> set(final V value) {

@ -17,12 +17,13 @@ package org.redisson.reactive;
import java.util.Date; import java.util.Date;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RExpirableReactive; import org.redisson.api.RExpirableReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
/** /**
@ -32,22 +33,32 @@ import org.redisson.command.CommandReactiveExecutor;
*/ */
abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive {
RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name) { RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(connectionManager, name); super(connectionManager, name, instance);
} }
RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(codec, connectionManager, name); super(codec, connectionManager, name, instance);
} }
@Override @Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) { public Publisher<Boolean> expire(final long timeToLive, final TimeUnit timeUnit) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive)); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAsync(timeToLive, timeUnit);
}
});
} }
@Override @Override
public Publisher<Boolean> expireAt(long timestamp) { public Publisher<Boolean> expireAt(final long timestamp) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, getName(), timestamp); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAtAsync(timestamp);
}
});
} }
@Override @Override
@ -57,12 +68,22 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen
@Override @Override
public Publisher<Boolean> clearExpire() { public Publisher<Boolean> clearExpire() {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName()); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.clearExpireAsync();
}
});
} }
@Override @Override
public Publisher<Long> remainTimeToLive() { public Publisher<Long> remainTimeToLive() {
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.PTTL, getName()); return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.remainTimeToLiveAsync();
}
});
} }
} }

@ -15,15 +15,15 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonHyperLogLog;
import org.redisson.api.RFuture;
import org.redisson.api.RHyperLogLogAsync;
import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RHyperLogLogReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
/** /**
@ -34,46 +34,66 @@ import org.redisson.command.CommandReactiveExecutor;
*/ */
public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive implements RHyperLogLogReactive<V> { public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive implements RHyperLogLogReactive<V> {
private final RHyperLogLogAsync<V> instance;
public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
} }
public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
} }
@Override @Override
public Publisher<Boolean> add(V obj) { public Publisher<Boolean> add(final V obj) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), encode(obj)); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.addAsync(obj);
}
});
} }
@Override @Override
public Publisher<Boolean> addAll(Collection<V> objects) { public Publisher<Boolean> addAll(final Collection<V> objects) {
List<Object> args = new ArrayList<Object>(objects.size() + 1); return reactive(new Supplier<RFuture<Boolean>>() {
args.add(getName()); @Override
encode(args, objects); public RFuture<Boolean> get() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), args.toArray()); return instance.addAllAsync(objects);
}
});
} }
@Override @Override
public Publisher<Long> count() { public Publisher<Long> count() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, getName()); return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.countAsync();
}
});
} }
@Override @Override
public Publisher<Long> countWith(String... otherLogNames) { public Publisher<Long> countWith(final String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1); return reactive(new Supplier<RFuture<Long>>() {
args.add(getName()); @Override
args.addAll(Arrays.asList(otherLogNames)); public RFuture<Long> get() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, args.toArray()); return instance.countWithAsync(otherLogNames);
}
});
} }
@Override @Override
public Publisher<Void> mergeWith(String... otherLogNames) { public Publisher<Void> mergeWith(final String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1); return reactive(new Supplier<RFuture<Void>>() {
args.add(getName()); @Override
args.addAll(Arrays.asList(otherLogNames)); public RFuture<Void> get() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFMERGE, args.toArray()); return instance.mergeWithAsync(otherLogNames);
}
});
} }
} }

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongConsumer; import java.util.function.LongConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -28,6 +29,7 @@ import org.reactivestreams.Subscription;
import org.redisson.RedissonKeys; import org.redisson.RedissonKeys;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RKeysReactive; import org.redisson.api.RKeysReactive;
import org.redisson.api.RType;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -221,4 +223,144 @@ public class RedissonKeysReactive implements RKeysReactive {
}); });
} }
@Override
public Publisher<Boolean> move(final String name, final int database) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.moveAsync(name, database);
}
});
}
@Override
public Publisher<Void> migrate(final String name, final String host, final int port, final int database, final long timeout) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.migrateAsync(name, host, port, database, timeout);
}
});
}
@Override
public Publisher<Void> copy(final String name, final String host, final int port, final int database, final long timeout) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.copyAsync(name, host, port, database, timeout);
}
});
}
@Override
public Publisher<Boolean> expire(final String name, final long timeToLive, final TimeUnit timeUnit) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAsync(name, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Boolean> expireAt(final String name, final long timestamp) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAtAsync(name, timestamp);
}
});
}
@Override
public Publisher<Boolean> clearExpire(final String name) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.clearExpireAsync(name);
}
});
}
@Override
public Publisher<Boolean> renamenx(final String oldName, final String newName) {
return commandExecutor.reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.renamenxAsync(oldName, newName);
}
});
}
@Override
public Publisher<Void> rename(final String currentName, final String newName) {
return commandExecutor.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.renameAsync(currentName, newName);
}
});
}
@Override
public Publisher<Long> remainTimeToLive(final String name) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.remainTimeToLiveAsync(name);
}
});
}
@Override
public Publisher<Long> touch(final String... names) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.touchAsync(names);
}
});
}
@Override
public Publisher<Long> countExists(final String... names) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.countExistsAsync(names);
}
});
}
@Override
public Publisher<RType> getType(final String key) {
return commandExecutor.reactive(new Supplier<RFuture<RType>>() {
@Override
public RFuture<RType> get() {
return instance.getTypeAsync(key);
}
});
}
@Override
public Publisher<Long> unlink(final String... keys) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.unlinkAsync(keys);
}
});
}
@Override
public Publisher<Long> count() {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.countAsync();
}
});
}
} }

@ -34,6 +34,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.RedissonList; import org.redisson.RedissonList;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RListReactive; import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -53,16 +54,16 @@ import reactor.core.publisher.Mono;
*/ */
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> { public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
private final RedissonList<V> instance; private final RListAsync<V> instance;
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name, new RedissonList<V>(commandExecutor, name, null));
instance = new RedissonList<V>(commandExecutor, name, null); this.instance = (RListAsync<V>) super.instance;
} }
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name, new RedissonList<V>(codec, commandExecutor, name, null));
instance = new RedissonList<V>(codec, commandExecutor, name, null); this.instance = (RListAsync<V>) super.instance;
} }
@Override @Override
@ -305,7 +306,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return reactive(new Supplier<RFuture<Long>>() { return reactive(new Supplier<RFuture<Long>>() {
@Override @Override
public RFuture<Long> get() { public RFuture<Long> get() {
return instance.indexOfAsync(o, new LongReplayConvertor()); return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor());
} }
}); });
} }
@ -315,7 +316,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return reactive(new Supplier<RFuture<Long>>() { return reactive(new Supplier<RFuture<Long>>() {
@Override @Override
public RFuture<Long> get() { public RFuture<Long> get() {
return instance.lastIndexOfAsync(o, new LongReplayConvertor()); return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor());
} }
}); });
} }

@ -23,7 +23,6 @@ import org.redisson.RedissonLock;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync; import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive; import org.redisson.api.RLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
/** /**
@ -36,12 +35,12 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R
private final RLockAsync instance; private final RLockAsync instance;
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) { public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name); this(connectionManager, name, new RedissonLock(connectionManager, name));
instance = createLock(connectionManager, name);
} }
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) {
return new RedissonLock(commandExecutor, name); super(connectionManager, name, instance);
this.instance = instance;
} }
@Override @Override

@ -65,13 +65,21 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
private final RMapCacheAsync<K, V> mapCache; private final RMapCacheAsync<K, V> mapCache;
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) { public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name); this(commandExecutor, name, options, new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options));
this.mapCache = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options);
} }
public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapCacheAsync<K, V> mapCache) {
super(commandExecutor, name, mapCache);
this.mapCache = mapCache;
}
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) { public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name); this(codec, commandExecutor, name, options, new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options));
this.mapCache = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options); }
public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapCacheAsync<K, V> mapCache) {
super(codec, commandExecutor, name, mapCache);
this.mapCache = mapCache;
} }
@Override @Override

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -55,13 +54,21 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
private final RMapAsync<K, V> instance; private final RMapAsync<K, V> instance;
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) { public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name); this(commandExecutor, name, options, new RedissonMap<K, V>(commandExecutor, name, null, options));
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options); }
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapAsync<K, V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
} }
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) { public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name); this(codec, commandExecutor, name, options, new RedissonMap<K, V>(codec, commandExecutor, name, null, options));
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options); }
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options, RMapAsync<K, V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
} }
@Override @Override

@ -21,10 +21,10 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonReference; import org.redisson.RedissonReference;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RObjectReactive; import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonObjectFactory;
@ -42,19 +42,21 @@ abstract class RedissonObjectReactive implements RObjectReactive {
final CommandReactiveExecutor commandExecutor; final CommandReactiveExecutor commandExecutor;
private final String name; private final String name;
final Codec codec; final Codec codec;
protected RExpirableAsync instance;
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
this.codec = codec; this.codec = codec;
this.name = name; this.name = name;
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.instance = instance;
} }
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) { public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
return commandExecutor.reactive(supplier); return commandExecutor.reactive(supplier);
} }
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance);
} }
protected <V> Mono<V> newSucceeded(V result) { protected <V> Mono<V> newSucceeded(V result) {
@ -123,33 +125,63 @@ abstract class RedissonObjectReactive implements RObjectReactive {
} }
@Override @Override
public Publisher<Void> rename(String newName) { public Publisher<Void> rename(final String newName) {
return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName); return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.renameAsync(newName);
}
});
} }
@Override @Override
public Publisher<Void> migrate(String host, int port, int database) { public Publisher<Void> migrate(final String host, final int port, final int database, final long timeout) {
return commandExecutor.writeReactive(getName(), RedisCommands.MIGRATE, host, port, getName(), database); return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.migrateAsync(host, port, database, timeout);
}
});
} }
@Override @Override
public Publisher<Boolean> move(int database) { public Publisher<Boolean> move(final int database) {
return commandExecutor.writeReactive(getName(), RedisCommands.MOVE, getName(), database); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.moveAsync(database);
}
});
} }
@Override @Override
public Publisher<Boolean> renamenx(String newName) { public Publisher<Boolean> renamenx(final String newName) {
return commandExecutor.writeReactive(getName(), RedisCommands.RENAMENX, getName(), newName); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.renamenxAsync(newName);
}
});
} }
@Override @Override
public Publisher<Boolean> delete() { public Publisher<Boolean> delete() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName()); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.deleteAsync();
}
});
} }
@Override @Override
public Publisher<Boolean> isExists() { public Publisher<Boolean> isExists() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.EXISTS, getName()); return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.isExistsAsync();
}
});
} }
} }

@ -39,8 +39,8 @@ public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableR
private final RPermitExpirableSemaphoreAsync instance; private final RPermitExpirableSemaphoreAsync instance;
public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name); super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub));
instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub); instance = (RPermitExpirableSemaphoreAsync) super.instance;
} }
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {

@ -16,11 +16,9 @@
package org.redisson.reactive; package org.redisson.reactive;
import org.redisson.RedissonReadWriteLock; import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive; import org.redisson.api.RLockReactive;
import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLock;
import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RReadWriteLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
/** /**
@ -33,28 +31,18 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp
private final RReadWriteLock instance; private final RReadWriteLock instance;
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name));
this.instance = new RedissonReadWriteLock(commandExecutor, name); this.instance = (RReadWriteLock) super.instance;
} }
@Override @Override
public RLockReactive readLock() { public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName()) { return new RedissonLockReactive(commandExecutor, getName(), instance.readLock());
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.readLock();
}
};
} }
@Override @Override
public RLockReactive writeLock() { public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName()) { return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock());
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.writeLock();
}
};
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -48,13 +47,21 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
private final RScoredSortedSetAsync<V> instance; private final RScoredSortedSetAsync<V> instance;
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); this(commandExecutor, name, new RedissonScoredSortedSet<V>(commandExecutor, name, null));
instance = new RedissonScoredSortedSet<V>(commandExecutor, name, null);
} }
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); this(codec, commandExecutor, name, new RedissonScoredSortedSet<V>(codec, commandExecutor, name, null));
instance = new RedissonScoredSortedSet<V>(codec, commandExecutor, name, null); }
public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
} }
@Override @Override

@ -36,8 +36,8 @@ public class RedissonSemaphoreReactive extends RedissonExpirableReactive impleme
private final RSemaphoreAsync instance; private final RSemaphoreAsync instance;
public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name); super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub));
instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub); instance = (RSemaphoreAsync) super.instance;
} }
@Override @Override

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -25,7 +24,9 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache; import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetCacheReactive;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -60,18 +61,27 @@ import reactor.core.publisher.Flux;
*/ */
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> { public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
private final RedissonSetCache<V> instance; private final RSetCacheAsync<V> instance;
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); this(commandExecutor, name, new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null));
instance = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null); }
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
} }
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); this(codec, commandExecutor, name, new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null));
instance = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
} }
public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
}
@Override @Override
public Publisher<Integer> size() { public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName());
@ -91,7 +101,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() { return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
@Override @Override
public RFuture<ListScanResult<ScanObjectEntry>> get() { public RFuture<ListScanResult<ScanObjectEntry>> get() {
return instance.scanIteratorAsync(getName(), client, startPos, null); return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null);
} }
}); });
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -26,6 +25,7 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonSet; import org.redisson.RedissonSet;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -46,17 +46,26 @@ import reactor.core.publisher.Flux;
*/ */
public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> { public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> {
private final RedissonSet<V> instance; private final RSetAsync<V> instance;
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); this(commandExecutor, name, new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null));
instance = new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null); }
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name, RSetAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
} }
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); this(codec, commandExecutor, name, new RedissonSet<V>(codec, commandExecutor, name, null));
instance = new RedissonSet<V>(codec, commandExecutor, name, null);
} }
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
}
@Override @Override
public Publisher<Integer> addAll(Publisher<? extends V> c) { public Publisher<Integer> addAll(Publisher<? extends V> c) {

@ -0,0 +1,119 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTransaction;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.transaction.RedissonTransaction;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTransactionReactive implements RTransactionReactive {
private final RTransaction transaction;
private final CommandReactiveExecutor executorService;
public RedissonTransactionReactive(CommandReactiveExecutor executorService, TransactionOptions options) {
this.transaction = new RedissonTransaction(executorService, options);
this.executorService = executorService;
}
@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(executorService, name, transaction.<V>getBucket(name));
}
@Override
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, executorService, name, transaction.<V>getBucket(name, codec));
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(executorService, name, null, transaction.<K, V>getMap(name));
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
return new RedissonMapReactive<K, V>(codec, executorService, name, null, transaction.<K, V>getMap(name, codec));
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, executorService, name, null, transaction.<K, V>getMapCache(name, codec));
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(executorService, name, null, transaction.<K, V>getMapCache(name));
}
@Override
public <V> RSetReactive<V> getSet(String name) {
return new RedissonSetReactive<V>(executorService, name, transaction.<V>getSet(name));
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
return new RedissonSetReactive<V>(codec, executorService, name, transaction.<V>getSet(name, codec));
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(executorService, name, transaction.<V>getSetCache(name));
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, executorService, name, transaction.<V>getSetCache(name, codec));
}
@Override
public Publisher<Void> commit() {
return executorService.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return transaction.commitAsync();
}
});
}
@Override
public Publisher<Void> rollback() {
return executorService.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return transaction.rollbackAsync();
}
});
}
}

@ -166,8 +166,6 @@ public class BaseTransactionalMap<K, V> {
result.trySuccess(exists); result.trySuccess(exists);
} }
}); });
result.trySuccess(null);
return result; return result;
} }
@ -512,7 +510,7 @@ public class BaseTransactionalMap<K, V> {
executeLocked(result, new Runnable() { executeLocked(result, new Runnable() {
@Override @Override
public void run() { public void run() {
AtomicLong counter = new AtomicLong(); final AtomicLong counter = new AtomicLong();
List<K> keyList = Arrays.asList(keys); List<K> keyList = Arrays.asList(keys);
for (Iterator<K> iterator = keyList.iterator(); iterator.hasNext();) { for (Iterator<K> iterator = keyList.iterator(); iterator.hasNext();) {
K key = iterator.next(); K key = iterator.next();
@ -536,13 +534,14 @@ public class BaseTransactionalMap<K, V> {
return; return;
} }
for (K key : keys) { for (K key : future.getNow().keySet()) {
HashValue keyHash = toKeyHash(key); HashValue keyHash = toKeyHash(key);
operations.add(new MapFastRemoveOperation(map, key)); operations.add(new MapFastRemoveOperation(map, key));
counter.incrementAndGet();
state.put(keyHash, MapEntry.NULL); state.put(keyHash, MapEntry.NULL);
} }
result.trySuccess(null); result.trySuccess(counter.get());
} }
}); });
} }

@ -0,0 +1,68 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction;
import org.redisson.client.codec.Codec;
/**
*
* @author Nikita Koksharov
*
*/
public class HashKey {
final Codec codec;
final String name;
public HashKey(String name, Codec codec) {
this.name = name;
this.codec = codec;
}
public Codec getCodec() {
return codec;
}
public String getName() {
return name;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
HashKey other = (HashKey) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
}

@ -0,0 +1,43 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
*
*/
public class HashValue {
private final AtomicInteger counter = new AtomicInteger();
private final List<byte[]> keyIds = new ArrayList<byte[]>();
public HashValue() {
}
public AtomicInteger getCounter() {
return counter;
}
public List<byte[]> getKeyIds() {
return keyIds;
}
}

@ -33,6 +33,7 @@ import org.redisson.RedissonLocalCachedMap;
import org.redisson.RedissonObject; import org.redisson.RedissonObject;
import org.redisson.RedissonTopic; import org.redisson.RedissonTopic;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
@ -54,10 +55,15 @@ import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.map.MapOperation; import org.redisson.transaction.operation.map.MapOperation;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -162,6 +168,73 @@ public class RedissonTransaction implements RTransaction {
return new RedissonTransactionalMapCache<K, V>(codec, commandExecutor, name, operations, options.getTimeout(), executed); return new RedissonTransactionalMapCache<K, V>(codec, commandExecutor, name, operations, options.getTimeout(), executed);
} }
@Override
public RFuture<Void> commitAsync() {
checkState();
checkTimeout();
final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
for (TransactionalOperation transactionalOperation : operations) {
System.out.println("transactionalOperation " + transactionalOperation);
transactionalOperation.commit(transactionExecutor);
}
final String id = generateId();
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id);
future.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@Override
public void operationComplete(Future<Map<HashKey, HashValue>> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to execute transaction", future.cause()));
return;
}
final Map<HashKey, HashValue> hashes = future.getNow();
try {
checkTimeout();
} catch (TransactionTimeoutException e) {
enableLocalCacheAsync(id, hashes);
result.tryFailure(e);
return;
}
int syncSlaves = 0;
if (!commandExecutor.getConnectionManager().isClusterMode()) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
syncSlaves = entry.getAvailableClients() - 1;
}
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
RFuture<Object> transactionFuture = transactionExecutor.executeAsync(batchOptions);
transactionFuture.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to execute transaction", future.cause()));
return;
}
enableLocalCacheAsync(id, hashes);
operations.clear();
executed.set(true);
result.trySuccess(null);
}
});
}
});
return result;
}
@Override @Override
public void commit() { public void commit() {
checkState(); checkState();
@ -175,7 +248,7 @@ public class RedissonTransaction implements RTransaction {
} }
String id = generateId(); String id = generateId();
Map<TransactionalOperation, List<byte[]>> hashes = disableLocalCache(id); Map<HashKey, HashValue> hashes = disableLocalCache(id);
try { try {
checkTimeout(); checkTimeout();
@ -205,6 +278,7 @@ public class RedissonTransaction implements RTransaction {
enableLocalCache(id, hashes); enableLocalCache(id, hashes);
operations.clear();
executed.set(true); executed.set(true);
} }
@ -214,16 +288,32 @@ public class RedissonTransaction implements RTransaction {
} }
} }
private void enableLocalCache(String requestId, Map<TransactionalOperation, List<byte[]>> hashes) { private RFuture<BatchResult<?>> enableLocalCacheAsync(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg);
}
return publishBatch.executeAsync();
}
private void enableLocalCache(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) { if (hashes.isEmpty()) {
return; return;
} }
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) { for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX); String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE); RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().toArray(new byte[entry.getValue().size()][])); LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg); topic.publishAsync(msg);
} }
@ -232,28 +322,28 @@ public class RedissonTransaction implements RTransaction {
} catch (Exception e) { } catch (Exception e) {
// skip it. Disabled local cache entries are enabled once reach timeout. // skip it. Disabled local cache entries are enabled once reach timeout.
} }
} }
private Map<TransactionalOperation, List<byte[]>> disableLocalCache(String requestId) { private Map<HashKey, HashValue> disableLocalCache(String requestId) {
if (localCaches.isEmpty()) { if (localCaches.isEmpty()) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
Map<TransactionalOperation, List<byte[]>> hashes = new HashMap<TransactionalOperation, List<byte[]>>(localCaches.size()); Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) { for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) { if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation; MapOperation mapOperation = (MapOperation) transactionalOperation;
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>)mapOperation.getMap(); RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>)mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash(); byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash();
List<byte[]> list = hashes.get(transactionalOperation); HashValue value = hashes.get(hashKey);
if (list == null) { if (value == null) {
list = new ArrayList<byte[]>(); value = new HashValue();
hashes.put(transactionalOperation, list); hashes.put(hashKey, value);
} }
list.add(key); value.getKeyIds().add(key);
String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec()); RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec());
@ -269,18 +359,16 @@ public class RedissonTransaction implements RTransaction {
throw new TransactionException("Unable to execute transaction over local cached map objects: " + localCaches, e); throw new TransactionException("Unable to execute transaction over local cached map objects: " + localCaches, e);
} }
final Map<String, AtomicInteger> map = new HashMap<String, AtomicInteger>();
final CountDownLatch latch = new CountDownLatch(hashes.size()); final CountDownLatch latch = new CountDownLatch(hashes.size());
List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>(); List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) { for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE, RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX)); commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX));
topics.add(topic); topics.add(topic);
map.put(entry.getKey().getName(), new AtomicInteger());
topic.addListener(new MessageListener<Object>() { topic.addListener(new MessageListener<Object>() {
@Override @Override
public void onMessage(String channel, Object msg) { public void onMessage(String channel, Object msg) {
AtomicInteger counter = map.get(entry.getKey().getName()); AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
latch.countDown(); latch.countDown();
} }
@ -289,7 +377,7 @@ public class RedissonTransaction implements RTransaction {
} }
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (final Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) { for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec()); RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout()); LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
@ -297,7 +385,7 @@ public class RedissonTransaction implements RTransaction {
RTopicAsync<Object> topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE); RTopicAsync<Object> topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE);
RFuture<Long> future = topic.publishAsync(new LocalCachedMapDisable(requestId, RFuture<Long> future = topic.publishAsync(new LocalCachedMapDisable(requestId,
entry.getValue().toArray(new byte[entry.getValue().size()][]), options.getResponseTimeout())); entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout()));
future.addListener(new FutureListener<Long>() { future.addListener(new FutureListener<Long>() {
@Override @Override
public void operationComplete(Future<Long> future) throws Exception { public void operationComplete(Future<Long> future) throws Exception {
@ -306,7 +394,7 @@ public class RedissonTransaction implements RTransaction {
} }
int receivers = future.getNow().intValue(); int receivers = future.getNow().intValue();
AtomicInteger counter = map.get(entry.getKey().getName()); AtomicInteger counter = entry.getValue().getCounter();
if (counter.addAndGet(receivers) == 0) { if (counter.addAndGet(receivers) == 0) {
latch.countDown(); latch.countDown();
} }
@ -332,6 +420,139 @@ public class RedissonTransaction implements RTransaction {
return hashes; return hashes;
} }
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(final String requestId) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.<HashKey, HashValue>emptyMap());
}
final RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<Map<HashKey, HashValue>>();
final Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>)mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash();
HashValue value = hashes.get(hashKey);
if (value == null) {
value = new HashValue();
hashes.put(hashKey, value);
}
value.getKeyIds().add(key);
String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
multimap.putAsync(localCacheKey, ByteBufUtil.hexDump(key));
multimap.expireKeyAsync(localCacheKey, options.getResponseTimeout(), TimeUnit.MILLISECONDS);
}
}
RFuture<BatchResult<?>> batchListener = batch.executeAsync();
batchListener.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
final CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes);
listener.setCounter(hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
subscribedFutures.setCounter(hashes.size());
final List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
commandExecutor, disabledAckName);
topics.add(topic);
RFuture<Integer> topicFuture = topic.addListenerAsync(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) {
listener.decCounter();
}
}
});
topicFuture.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
subscribedFutures.decCounter();
}
});
}
subscriptionFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
multimap.removeAllAsync(localCacheKey);
RTopicAsync<Object> topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE);
RFuture<Long> publishFuture = topic.publishAsync(new LocalCachedMapDisable(requestId,
entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout()));
publishFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
int receivers = future.getNow().intValue();
AtomicInteger counter = entry.getValue().getCounter();
if (counter.addAndGet(receivers) == 0) {
listener.decCounter();
}
}
});
}
RFuture<BatchResult<?>> publishFuture = publishBatch.executeAsync();
publishFuture.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
result.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@Override
public void operationComplete(Future<Map<HashKey, HashValue>> future)
throws Exception {
for (RTopic<Object> topic : topics) {
topic.removeAllListeners();
}
}
});
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms"));
}
}, options.getResponseTimeout(), TimeUnit.MILLISECONDS);
}
});
}
});
}
});
return result;
}
protected static String generateId() { protected static String generateId() {
byte[] id = new byte[16]; byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom // TODO JDK UPGRADE replace to native ThreadLocalRandom
@ -351,12 +572,39 @@ public class RedissonTransaction implements RTransaction {
try { try {
executorService.execute(BatchOptions.defaults()); executorService.execute(BatchOptions.defaults());
} catch (Exception e) { } catch (Exception e) {
throw new TransactionException("Unable to execute transaction", e); throw new TransactionException("Unable to rollback transaction", e);
} }
operations.clear(); operations.clear();
executed.set(true); executed.set(true);
} }
@Override
public RFuture<Void> rollbackAsync() {
checkState();
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.rollback(executorService);
}
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Object> future = executorService.executeAsync(BatchOptions.defaults());
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to rollback transaction", future.cause()));
return;
}
operations.clear();
executed.set(true);
result.trySuccess(null);
}
});
return result;
}
protected void checkState() { protected void checkState() {
if (executed.get()) { if (executed.get()) {

@ -155,8 +155,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow()); result.trySuccess(future.getNow());
} }
}); });
result.trySuccess(null);
} }
}); });
return result; return result;
@ -193,8 +191,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow()); result.trySuccess(future.getNow());
} }
}); });
result.trySuccess(null);
} }
}); });
return result; return result;
@ -231,8 +227,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow()); result.trySuccess(future.getNow());
} }
}); });
result.trySuccess(null);
} }
}); });
return result; return result;

@ -0,0 +1,148 @@
package org.redisson.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.BaseReactiveTest;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions;
public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest {
@Test
public void testTimeout() throws InterruptedException {
RBucketReactive<String> b = redisson.getBucket("test");
sync(b.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(3, TimeUnit.SECONDS));
RBucketReactive<String> bucket = transaction.getBucket("test");
sync(bucket.set("234"));
Thread.sleep(3000);
try {
sync(transaction.commit());
Assert.fail();
} catch (TransactionException e) {
// skip
}
Thread.sleep(1000);
assertThat(sync(b.get())).isEqualTo("123");
}
@Test
public void testSet() {
RBucketReactive<String> b = redisson.getBucket("test");
sync(b.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
sync(bucket.set("234"));
assertThat(sync(bucket.get())).isEqualTo("234");
sync(transaction.commit());
assertThat(sync(redisson.getKeys().count())).isEqualTo(1);
assertThat(sync(b.get())).isEqualTo("234");
}
@Test
public void testGetAndSet() {
RBucketReactive<String> b = redisson.getBucket("test");
sync(b.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
assertThat(sync(bucket.getAndSet("0"))).isEqualTo("123");
assertThat(sync(bucket.get())).isEqualTo("0");
assertThat(sync(bucket.getAndSet("324"))).isEqualTo("0");
sync(transaction.commit());
assertThat(sync(redisson.getKeys().count())).isEqualTo(1);
assertThat(sync(b.get())).isEqualTo("324");
}
@Test
public void testCompareAndSet() {
RBucketReactive<String> b = redisson.getBucket("test");
sync(b.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
assertThat(sync(bucket.compareAndSet("0", "434"))).isFalse();
assertThat(sync(bucket.get())).isEqualTo("123");
assertThat(sync(bucket.compareAndSet("123", "232"))).isTrue();
assertThat(sync(bucket.get())).isEqualTo("232");
sync(transaction.commit());
assertThat(sync(redisson.getKeys().count())).isEqualTo(1);
assertThat(sync(b.get())).isEqualTo("232");
}
@Test
public void testTrySet() {
RBucketReactive<String> b = redisson.getBucket("test");
sync(b.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> bucket = transaction.getBucket("test");
assertThat(sync(bucket.trySet("0"))).isFalse();
assertThat(sync(bucket.delete())).isTrue();
assertThat(sync(bucket.trySet("324"))).isTrue();
assertThat(sync(bucket.trySet("43"))).isFalse();
sync(transaction.commit());
assertThat(sync(redisson.getKeys().count())).isEqualTo(1);
assertThat(sync(b.get())).isEqualTo("324");
}
@Test
public void testGetAndRemove() {
RBucketReactive<String> m = redisson.getBucket("test");
sync(m.set("123"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<String> set = transaction.getBucket("test");
assertThat(sync(set.get())).isEqualTo("123");
assertThat(sync(set.size())).isEqualTo(5);
assertThat(sync(set.getAndDelete())).isEqualTo("123");
assertThat(sync(set.size())).isEqualTo(0);
assertThat(sync(set.get())).isNull();
assertThat(sync(set.getAndDelete())).isNull();
sync(transaction.commit());
assertThat(sync(redisson.getKeys().count())).isEqualTo(0);
assertThat(sync(m.get())).isNull();
}
@Test
public void testRollback() {
RBucketReactive<Object> b = redisson.getBucket("test");
sync(b.set("1234"));
RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults());
RBucketReactive<Object> bucket = transaction.getBucket("test");
assertThat(sync(bucket.get())).isEqualTo("1234");
assertThat(sync(bucket.getAndDelete())).isEqualTo("1234");
assertThat(sync(b.get())).isEqualTo("1234");
sync(transaction.rollback());
assertThat(sync(redisson.getKeys().count())).isEqualTo(1);
assertThat(sync(b.get())).isEqualTo("1234");
}
}
Loading…
Cancel
Save