From 5b01a444b07ee648e063c34d84d34538c6b26d33 Mon Sep 17 00:00:00 2001 From: Rui Gu Date: Wed, 18 Apr 2018 10:46:20 +0100 Subject: [PATCH 1/4] updated javadocs url to latest version --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9dad30fa8..cb3e95a11 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Redisson: Redis based In-Memory Data Grid for Java. ==== -[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.5.7) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** +[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. From d5b930618c1db98f71ff8bb5f81feb1e0dd067c7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Apr 2018 12:54:46 +0300 Subject: [PATCH 2/4] commitAsync and rollbackAsync methods added to RTransaction --- .../main/java/org/redisson/RedissonTopic.java | 19 ++ .../java/org/redisson/api/RTopicAsync.java | 15 +- .../java/org/redisson/api/RTransaction.java | 10 + .../redisson/connection/MasterSlaveEntry.java | 1 + .../connection/SentinelConnectionManager.java | 1 + .../balancer/LoadBalancerManager.java | 2 +- .../CountableListener.java | 22 +- .../transaction/BaseTransactionalMap.java | 7 +- .../org/redisson/transaction/HashKey.java | 53 ++++ .../org/redisson/transaction/HashValue.java | 28 ++ .../transaction/RedissonTransaction.java | 290 ++++++++++++++++-- .../RedissonTransactionalBucket.java | 6 - 12 files changed, 411 insertions(+), 43 deletions(-) rename redisson/src/main/java/org/redisson/{connection => misc}/CountableListener.java (95%) create mode 100644 redisson/src/main/java/org/redisson/transaction/HashKey.java create mode 100644 redisson/src/main/java/org/redisson/transaction/HashValue.java diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 9d1b8083a..93fd84f75 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -104,6 +104,25 @@ public class RedissonTopic implements RTopic { PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); return addListener(pubSubListener); } + + @Override + public RFuture addListenerAsync(final MessageListener listener) { + PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); + RFuture future = subscribeService.subscribe(codec, name, pubSubListener); + RPromise result = new RedissonPromise(); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(System.identityHashCode(pubSubListener)); + } + }); + return result; + } private int addListener(RedisPubSubListener pubSubListener) { RFuture future = subscribeService.subscribe(codec, name, pubSubListener); diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 6d91a74a6..4af810768 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import org.redisson.api.listener.MessageListener; + /** * Distributed topic. Messages are delivered to all message listeners across Redis cluster. * @@ -28,8 +30,19 @@ public interface RTopicAsync { * Publish the message to all subscribers of this topic asynchronously * * @param message to send - * @return the RFuture object with number of clients that received the message + * @return number of clients that received the message */ RFuture publishAsync(M message); + + /** + * Subscribes to this topic. + * MessageListener.onMessage is called when any message + * is published on this topic. + * + * @param listener for messages + * @return locally unique listener id + * @see org.redisson.api.listener.MessageListener + */ + RFuture addListenerAsync(MessageListener listener); } diff --git a/redisson/src/main/java/org/redisson/api/RTransaction.java b/redisson/src/main/java/org/redisson/api/RTransaction.java index 6a233e9cb..acdfa1149 100644 --- a/redisson/src/main/java/org/redisson/api/RTransaction.java +++ b/redisson/src/main/java/org/redisson/api/RTransaction.java @@ -157,10 +157,20 @@ public interface RTransaction { * Commits all changes made on this transaction. */ void commit(); + + /** + * Commits all changes made on this transaction in async mode. + */ + RFuture commitAsync(); /** * Rollback all changes made on this transaction. */ void rollback(); + /** + * Rollback all changes made on this transaction in async mode. + */ + RFuture rollbackAsync(); + } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5b51ea5c3..b1155f0a9 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool; +import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.TransferListener; diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 20c140115..1192a8863 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -44,6 +44,7 @@ import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.SentinelServersConfig; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 35d1c5d9f..d4d14f0d0 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -33,10 +33,10 @@ import org.redisson.config.ReadMode; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.CountableListener; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool; +import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; diff --git a/redisson/src/main/java/org/redisson/connection/CountableListener.java b/redisson/src/main/java/org/redisson/misc/CountableListener.java similarity index 95% rename from redisson/src/main/java/org/redisson/connection/CountableListener.java rename to redisson/src/main/java/org/redisson/misc/CountableListener.java index 94e681e66..ade47d5ac 100644 --- a/redisson/src/main/java/org/redisson/connection/CountableListener.java +++ b/redisson/src/main/java/org/redisson/misc/CountableListener.java @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.connection; +package org.redisson.misc; import java.util.concurrent.atomic.AtomicInteger; -import org.redisson.misc.RPromise; - import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -51,6 +49,15 @@ public class CountableListener implements FutureListener { counter.incrementAndGet(); } + public void decCounter() { + if (counter.decrementAndGet() == 0) { + onSuccess(value); + if (result != null) { + result.trySuccess(value); + } + } + } + @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { @@ -59,13 +66,8 @@ public class CountableListener implements FutureListener { } return; } - - if (counter.decrementAndGet() == 0) { - onSuccess(value); - if (result != null) { - result.trySuccess(value); - } - } + + decCounter(); } protected void onSuccess(T value) { diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index a8bb50469..df9efb475 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -166,8 +166,6 @@ public class BaseTransactionalMap { result.trySuccess(exists); } }); - - result.trySuccess(null); return result; } @@ -536,13 +534,14 @@ public class BaseTransactionalMap { return; } - for (K key : keys) { + for (K key : future.getNow().keySet()) { HashValue keyHash = toKeyHash(key); operations.add(new MapFastRemoveOperation(map, key)); + counter.incrementAndGet(); state.put(keyHash, MapEntry.NULL); } - result.trySuccess(null); + result.trySuccess(counter.get()); } }); } diff --git a/redisson/src/main/java/org/redisson/transaction/HashKey.java b/redisson/src/main/java/org/redisson/transaction/HashKey.java new file mode 100644 index 000000000..97be23502 --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/HashKey.java @@ -0,0 +1,53 @@ +package org.redisson.transaction; + +import org.redisson.client.codec.Codec; + +/** + * + * @author Nikita Koksharov + * + */ +public class HashKey { + + final Codec codec; + final String name; + + public HashKey(String name, Codec codec) { + this.name = name; + this.codec = codec; + } + + public Codec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HashKey other = (HashKey) obj; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + return true; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/HashValue.java b/redisson/src/main/java/org/redisson/transaction/HashValue.java new file mode 100644 index 000000000..7977a2de2 --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/HashValue.java @@ -0,0 +1,28 @@ +package org.redisson.transaction; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + * @author Nikita Koksharov + * + */ +public class HashValue { + + private final AtomicInteger counter = new AtomicInteger(); + private final List keyIds = new ArrayList(); + + public HashValue() { + } + + public AtomicInteger getCounter() { + return counter; + } + + public List getKeyIds() { + return keyIds; + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index bbc057391..8fd79fdd5 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -33,6 +34,7 @@ import org.redisson.RedissonLocalCachedMap; import org.redisson.RedissonObject; import org.redisson.RedissonTopic; import org.redisson.api.BatchOptions; +import org.redisson.api.BatchResult; import org.redisson.api.RBucket; import org.redisson.api.RFuture; import org.redisson.api.RLocalCachedMap; @@ -54,10 +56,15 @@ import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.misc.CountableListener; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.map.MapOperation; import io.netty.buffer.ByteBufUtil; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; @@ -162,6 +169,72 @@ public class RedissonTransaction implements RTransaction { return new RedissonTransactionalMapCache(codec, commandExecutor, name, operations, options.getTimeout(), executed); } + @Override + public RFuture commitAsync() { + checkState(); + + checkTimeout(); + + + CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); + for (TransactionalOperation transactionalOperation : operations) { + transactionalOperation.commit(transactionExecutor); + } + + String id = generateId(); + RPromise result = new RedissonPromise(); + RFuture> future = disableLocalCacheAsync(id); + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(new TransactionException("Unable to execute transaction", future.cause())); + return; + } + + Map hashes = future.getNow(); + try { + checkTimeout(); + } catch (TransactionTimeoutException e) { + enableLocalCacheAsync(id, hashes); + result.tryFailure(e); + return; + } + + int syncSlaves = 0; + if (!commandExecutor.getConnectionManager().isClusterMode()) { + MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next(); + syncSlaves = entry.getAvailableClients() - 1; + } + + BatchOptions batchOptions = BatchOptions.defaults() + .syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS) + .responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS) + .retryAttempts(options.getRetryAttempts()) + .retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS) + .atomic(); + + RFuture transactionFuture = transactionExecutor.executeAsync(batchOptions); + transactionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(new TransactionException("Unable to execute transaction", future.cause())); + return; + } + + enableLocalCacheAsync(id, hashes); + operations.clear(); + executed.set(true); + + result.trySuccess(null); + } + }); + } + }); + return result; + } + @Override public void commit() { checkState(); @@ -175,7 +248,7 @@ public class RedissonTransaction implements RTransaction { } String id = generateId(); - Map> hashes = disableLocalCache(id); + Map hashes = disableLocalCache(id); try { checkTimeout(); @@ -205,6 +278,7 @@ public class RedissonTransaction implements RTransaction { enableLocalCache(id, hashes); + operations.clear(); executed.set(true); } @@ -214,16 +288,32 @@ public class RedissonTransaction implements RTransaction { } } - private void enableLocalCache(String requestId, Map> hashes) { + private RFuture> enableLocalCacheAsync(String requestId, Map hashes) { + if (hashes.isEmpty()) { + return RedissonPromise.newSucceededFuture(null); + } + + RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + for (Entry entry : hashes.entrySet()) { + String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX); + RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE); + LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][])); + topic.publishAsync(msg); + } + + return publishBatch.executeAsync(); + } + + private void enableLocalCache(String requestId, Map hashes) { if (hashes.isEmpty()) { return; } RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); - for (Entry> entry : hashes.entrySet()) { + for (Entry entry : hashes.entrySet()) { String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX); RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE); - LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().toArray(new byte[entry.getValue().size()][])); + LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][])); topic.publishAsync(msg); } @@ -232,28 +322,28 @@ public class RedissonTransaction implements RTransaction { } catch (Exception e) { // skip it. Disabled local cache entries are enabled once reach timeout. } - } - - private Map> disableLocalCache(String requestId) { + + private Map disableLocalCache(String requestId) { if (localCaches.isEmpty()) { return Collections.emptyMap(); } - Map> hashes = new HashMap>(localCaches.size()); + Map hashes = new HashMap(localCaches.size()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { MapOperation mapOperation = (MapOperation) transactionalOperation; RedissonLocalCachedMap map = (RedissonLocalCachedMap)mapOperation.getMap(); + HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec()); byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash(); - List list = hashes.get(transactionalOperation); - if (list == null) { - list = new ArrayList(); - hashes.put(transactionalOperation, list); + HashValue value = hashes.get(hashKey); + if (value == null) { + value = new HashValue(); + hashes.put(hashKey, value); } - list.add(key); + value.getKeyIds().add(key); String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec()); @@ -269,18 +359,16 @@ public class RedissonTransaction implements RTransaction { throw new TransactionException("Unable to execute transaction over local cached map objects: " + localCaches, e); } - final Map map = new HashMap(); final CountDownLatch latch = new CountDownLatch(hashes.size()); List> topics = new ArrayList>(); - for (final Entry> entry : hashes.entrySet()) { + for (final Entry entry : hashes.entrySet()) { RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX)); topics.add(topic); - map.put(entry.getKey().getName(), new AtomicInteger()); topic.addListener(new MessageListener() { @Override public void onMessage(String channel, Object msg) { - AtomicInteger counter = map.get(entry.getKey().getName()); + AtomicInteger counter = entry.getValue().getCounter(); if (counter.decrementAndGet() == 0) { latch.countDown(); } @@ -289,7 +377,7 @@ public class RedissonTransaction implements RTransaction { } RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); - for (final Entry> entry : hashes.entrySet()) { + for (final Entry entry : hashes.entrySet()) { String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec()); LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout()); @@ -297,7 +385,7 @@ public class RedissonTransaction implements RTransaction { RTopicAsync topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE); RFuture future = topic.publishAsync(new LocalCachedMapDisable(requestId, - entry.getValue().toArray(new byte[entry.getValue().size()][]), options.getResponseTimeout())); + entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout())); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -306,7 +394,7 @@ public class RedissonTransaction implements RTransaction { } int receivers = future.getNow().intValue(); - AtomicInteger counter = map.get(entry.getKey().getName()); + AtomicInteger counter = entry.getValue().getCounter(); if (counter.addAndGet(receivers) == 0) { latch.countDown(); } @@ -332,6 +420,139 @@ public class RedissonTransaction implements RTransaction { return hashes; } + private RFuture> disableLocalCacheAsync(String requestId) { + if (localCaches.isEmpty()) { + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + } + + RPromise> result = new RedissonPromise>(); + Map hashes = new HashMap(localCaches.size()); + RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + for (TransactionalOperation transactionalOperation : operations) { + if (localCaches.contains(transactionalOperation.getName())) { + MapOperation mapOperation = (MapOperation) transactionalOperation; + RedissonLocalCachedMap map = (RedissonLocalCachedMap)mapOperation.getMap(); + + HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec()); + byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash(); + HashValue value = hashes.get(hashKey); + if (value == null) { + value = new HashValue(); + hashes.put(hashKey, value); + } + value.getKeyIds().add(key); + + String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); + RMultimapCacheAsync multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec()); + LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout()); + multimap.putAsync(localCacheKey, ByteBufUtil.hexDump(key)); + multimap.expireKeyAsync(localCacheKey, options.getResponseTimeout(), TimeUnit.MILLISECONDS); + } + } + + RFuture> batchListener = batch.executeAsync(); + batchListener.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + CountableListener> listener = + new CountableListener>(result, hashes); + listener.setCounter(hashes.size()); + RPromise subscriptionFuture = new RedissonPromise(); + CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); + subscribedFutures.setCounter(hashes.size()); + + List> topics = new ArrayList>(); + for (final Entry entry : hashes.entrySet()) { + final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); + RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, + commandExecutor, disabledAckName); + topics.add(topic); + RFuture topicFuture = topic.addListenerAsync(new MessageListener() { + @Override + public void onMessage(String channel, Object msg) { + AtomicInteger counter = entry.getValue().getCounter(); + if (counter.decrementAndGet() == 0) { + listener.decCounter(); + } + } + }); + topicFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + subscribedFutures.decCounter(); + } + }); + } + + subscriptionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + for (final Entry entry : hashes.entrySet()) { + String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); + RMultimapCacheAsync multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec()); + LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout()); + multimap.removeAllAsync(localCacheKey); + + RTopicAsync topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE); + RFuture publishFuture = topic.publishAsync(new LocalCachedMapDisable(requestId, + entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout())); + publishFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + int receivers = future.getNow().intValue(); + AtomicInteger counter = entry.getValue().getCounter(); + if (counter.addAndGet(receivers) == 0) { + listener.decCounter(); + } + } + }); + } + + RFuture> publishFuture = publishBatch.executeAsync(); + publishFuture.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + result.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) + throws Exception { + for (RTopic topic : topics) { + topic.removeAllListeners(); + } + } + }); + + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms")); + } + }, options.getResponseTimeout(), TimeUnit.MILLISECONDS); + } + }); + } + }); + } + }); + + return result; + } + protected static String generateId() { byte[] id = new byte[16]; // TODO JDK UPGRADE replace to native ThreadLocalRandom @@ -351,12 +572,39 @@ public class RedissonTransaction implements RTransaction { try { executorService.execute(BatchOptions.defaults()); } catch (Exception e) { - throw new TransactionException("Unable to execute transaction", e); + throw new TransactionException("Unable to rollback transaction", e); } operations.clear(); executed.set(true); } + + @Override + public RFuture rollbackAsync() { + checkState(); + + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + for (TransactionalOperation transactionalOperation : operations) { + transactionalOperation.rollback(executorService); + } + + RPromise result = new RedissonPromise(); + RFuture future = executorService.executeAsync(BatchOptions.defaults()); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(new TransactionException("Unable to rollback transaction", future.cause())); + return; + } + + operations.clear(); + executed.set(true); + result.trySuccess(null); + } + }); + return result; + } protected void checkState() { if (executed.get()) { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index fbdbf5ca2..29e7f43c1 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -155,8 +155,6 @@ public class RedissonTransactionalBucket extends RedissonBucket { result.trySuccess(future.getNow()); } }); - - result.trySuccess(null); } }); return result; @@ -193,8 +191,6 @@ public class RedissonTransactionalBucket extends RedissonBucket { result.trySuccess(future.getNow()); } }); - - result.trySuccess(null); } }); return result; @@ -231,8 +227,6 @@ public class RedissonTransactionalBucket extends RedissonBucket { result.trySuccess(future.getNow()); } }); - - result.trySuccess(null); } }); return result; From 220370e47d2f68899c060d120ec3591fe787849d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Apr 2018 12:54:56 +0300 Subject: [PATCH 3/4] javadoc fixed --- .../java/org/redisson/config/BaseMasterSlaveServersConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index 47a9ea6d7..0d3f8b99c 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -132,7 +132,7 @@ public class BaseMasterSlaveServersConfigslaveFailsInterval value. *

From c3655e5ef41d46b1c91181e32ed5d5e68026361c Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Apr 2018 18:27:34 +0300 Subject: [PATCH 4/4] RedissonReactive.createTransaction method added. #1372 --- .../org/redisson/RedissonHyperLogLog.java | 4 +- .../java/org/redisson/RedissonReactive.java | 8 + .../main/java/org/redisson/RedissonSet.java | 6 + .../java/org/redisson/RedissonSetCache.java | 1 + .../main/java/org/redisson/RedissonTopic.java | 4 +- .../main/java/org/redisson/ScanIterator.java | 3 + .../org/redisson/api/RBucketReactive.java | 2 + .../redisson/api/RHyperLogLogReactive.java | 6 + .../java/org/redisson/api/RKeysReactive.java | 137 +++++++++++++++ .../org/redisson/api/RObjectReactive.java | 3 +- .../redisson/api/RTransactionReactive.java | 157 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 8 + .../RedissonAtomicDoubleReactive.java | 9 +- .../reactive/RedissonAtomicLongReactive.java | 8 +- .../RedissonBaseMultimapReactive.java | 4 +- .../reactive/RedissonBitSetReactive.java | 11 +- .../reactive/RedissonBucketReactive.java | 26 ++- .../reactive/RedissonExpirableReactive.java | 46 +++-- .../reactive/RedissonHyperLogLogReactive.java | 67 +++++--- .../reactive/RedissonKeysReactive.java | 142 ++++++++++++++++ .../reactive/RedissonListReactive.java | 15 +- .../reactive/RedissonLockReactive.java | 9 +- .../reactive/RedissonMapCacheReactive.java | 16 +- .../reactive/RedissonMapReactive.java | 17 +- .../reactive/RedissonObjectReactive.java | 60 +++++-- ...issonPermitExpirableSemaphoreReactive.java | 4 +- .../RedissonReadWriteLockReactive.java | 20 +-- .../RedissonScoredSortedSetReactive.java | 17 +- .../reactive/RedissonSemaphoreReactive.java | 4 +- .../reactive/RedissonSetCacheReactive.java | 24 ++- .../reactive/RedissonSetReactive.java | 21 ++- .../reactive/RedissonTransactionReactive.java | 119 +++++++++++++ .../transaction/BaseTransactionalMap.java | 2 +- .../org/redisson/transaction/HashKey.java | 15 ++ .../org/redisson/transaction/HashValue.java | 15 ++ .../transaction/RedissonTransaction.java | 26 +-- ...dissonTransactionalBucketReactiveTest.java | 148 +++++++++++++++++ 37 files changed, 1046 insertions(+), 138 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RTransactionReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java create mode 100644 redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java diff --git a/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java b/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java index 0cac38234..f28ba0fec 100644 --- a/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java +++ b/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java @@ -34,11 +34,11 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperLogLog { - protected RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) { + public RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } - protected RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + public RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index b2129a387..62a91519f 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -52,7 +52,9 @@ import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; +import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; +import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.ReferenceCodecProvider; @@ -88,6 +90,7 @@ import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonTopicReactive; +import org.redisson.reactive.RedissonTransactionReactive; /** * Main infrastructure class allows to get access @@ -411,5 +414,10 @@ public class RedissonReactive implements RedissonReactiveClient { public RMapReactive getMap(String name, Codec codec, MapOptions options) { return new RedissonMapReactive(codec, commandExecutor, name, options); } + + @Override + public RTransactionReactive createTransaction(TransactionOptions options) { + return new RedissonTransactionReactive(commandExecutor, options); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index d2fa8cd32..07fe4882b 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -565,5 +565,11 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt String lockName = getLockName(value); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, + String pattern) { + throw new UnsupportedOperationException(); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index a4bdbe431..45c107455 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -128,6 +128,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return get(f); } + @Override public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(startPos); diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 93fd84f75..2f443d0d6 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -107,9 +107,9 @@ public class RedissonTopic implements RTopic { @Override public RFuture addListenerAsync(final MessageListener listener) { - PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); + final PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); RFuture future = subscribeService.subscribe(codec, name, pubSubListener); - RPromise result = new RedissonPromise(); + final RPromise result = new RedissonPromise(); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java index 9e3f7342f..3531dda26 100644 --- a/redisson/src/main/java/org/redisson/ScanIterator.java +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -28,6 +29,8 @@ public interface ScanIterator { ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern); + RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern); + boolean remove(Object value); } diff --git a/redisson/src/main/java/org/redisson/api/RBucketReactive.java b/redisson/src/main/java/org/redisson/api/RBucketReactive.java index 0503a611d..6879be489 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBucketReactive.java @@ -45,6 +45,8 @@ public interface RBucketReactive extends RExpirableReactive { Publisher getAndSet(V newValue); Publisher get(); + + Publisher getAndDelete(); Publisher set(V value); diff --git a/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java b/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java index f3d9fe201..2e04082ea 100644 --- a/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java +++ b/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java @@ -19,6 +19,12 @@ import java.util.Collection; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + * @param + */ public interface RHyperLogLogReactive extends RExpirableReactive { Publisher add(V obj); diff --git a/redisson/src/main/java/org/redisson/api/RKeysReactive.java b/redisson/src/main/java/org/redisson/api/RKeysReactive.java index 744eef68b..de51ee2e7 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysReactive.java +++ b/redisson/src/main/java/org/redisson/api/RKeysReactive.java @@ -16,11 +16,130 @@ package org.redisson.api; import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + */ public interface RKeysReactive { + /** + * Move object to another database + * + * @param name of object + * @param database - Redis database number + * @return true if key was moved else false + */ + Publisher move(String name, int database); + + /** + * Transfer object from source Redis instance to destination Redis instance + * + * @param name of object + * @param host - destination host + * @param port - destination port + * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds + */ + Publisher migrate(String name, String host, int port, int database, long timeout); + + /** + * Copy object from source Redis instance to destination Redis instance + * + * @param name of object + * @param host - destination host + * @param port - destination port + * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds + */ + Publisher copy(String name, String host, int port, int database, long timeout); + + /** + * Set a timeout for object. After the timeout has expired, + * the key will automatically be deleted. + * + * @param name of object + * @param timeToLive - timeout before object will be deleted + * @param timeUnit - timeout time unit + * @return true if the timeout was set and false if not + */ + Publisher expire(String name, long timeToLive, TimeUnit timeUnit); + + /** + * Set an expire date for object. When expire date comes + * the key will automatically be deleted. + * + * @param name of object + * @param timestamp - expire date in milliseconds (Unix timestamp) + * @return true if the timeout was set and false if not + */ + Publisher expireAt(String name, long timestamp); + + /** + * Clear an expire timeout or expire date for object. + * + * @param name of object + * @return true if timeout was removed + * false if object does not exist or does not have an associated timeout + */ + Publisher clearExpire(String name); + + /** + * Rename object with oldName to newName + * only if new key is not exists + * + * @param oldName - old name of object + * @param newName - new name of object + * @return true if object has been renamed successfully and false otherwise + */ + Publisher renamenx(String oldName, String newName); + + /** + * Rename current object key to newName + * + * @param currentName - current name of object + * @param newName - new name of object + */ + Publisher rename(String currentName, String newName); + + /** + * Remaining time to live of Redisson object that has a timeout + * + * @param name of key + * @return time in milliseconds + * -2 if the key does not exist. + * -1 if the key exists but has no associated expire. + */ + Publisher remainTimeToLive(String name); + + /** + * Update the last access time of an object. + * + * @param names of keys + * @return count of objects were touched + */ + Publisher touch(String... names); + + /** + * Checks if provided keys exist + * + * @param names of keys + * @return amount of existing keys + */ + Publisher countExists(String... names); + + /** + * Get Redis object type by key + * + * @param key - name of key + * @return type of key + */ + Publisher getType(String key); + /** * Load keys in incrementally iterate mode. * @@ -105,6 +224,24 @@ public interface RKeysReactive { */ Publisher delete(String ... keys); + /** + * Delete multiple objects by name. + * Actual removal will happen later asynchronously. + *

+ * Requires Redis 4.0+ + * + * @param keys of objects + * @return number of removed keys + */ + Publisher unlink(String ... keys); + + /** + * Returns the number of keys in the currently-selected database + * + * @return count of keys + */ + Publisher count(); + /** * Delete all the keys of the currently selected database * diff --git a/redisson/src/main/java/org/redisson/api/RObjectReactive.java b/redisson/src/main/java/org/redisson/api/RObjectReactive.java index 634092686..088f3527e 100644 --- a/redisson/src/main/java/org/redisson/api/RObjectReactive.java +++ b/redisson/src/main/java/org/redisson/api/RObjectReactive.java @@ -36,9 +36,10 @@ public interface RObjectReactive { * @param host - destination host * @param port - destination port * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds * @return void */ - Publisher migrate(String host, int port, int database); + Publisher migrate(String host, int port, int database, long timeout); /** * Move object to another database in mode diff --git a/redisson/src/main/java/org/redisson/api/RTransactionReactive.java b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java new file mode 100644 index 000000000..db6418687 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java @@ -0,0 +1,157 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; + +/** + * Transaction object allows to execute transactions over Redisson objects. + * Uses locks for write operations and maintains data modification operations list till the commit/rollback operation. + *

+ * Transaction isolation level: READ_COMMITTED + * + * @author Nikita Koksharov + * + */ +public interface RTransactionReactive { + + /** + * Returns transactional object holder instance by name. + * + * @param type of value + * @param name - name of object + * @return Bucket object + */ + RBucketReactive getBucket(String name); + + /** + * Returns transactional object holder instance by name + * using provided codec for object. + * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return Bucket object + */ + RBucketReactive getBucket(String name, Codec codec); + + /** + * Returns transactional map instance by name. + * + * @param type of key + * @param type of value + * @param name - name of object + * @return Map object + */ + RMapReactive getMap(String name); + + /** + * Returns transactional map instance by name + * using provided codec for both map keys and values. + * + * @param type of key + * @param type of value + * @param name - name of object + * @param codec - codec for keys and values + * @return Map object + */ + RMapReactive getMap(String name, Codec codec); + + /** + * Returns transactional set instance by name. + * + * @param type of value + * @param name - name of object + * @return Set object + */ + RSetReactive getSet(String name); + + /** + * Returns transactional set instance by name + * using provided codec for set objects. + * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return Set object + */ + RSetReactive getSet(String name, Codec codec); + + /** + * Returns transactional set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String)}.

+ * + * @param type of value + * @param name - name of object + * @return SetCache object + */ + RSetCacheReactive getSetCache(String name); + + /** + * Returns transactional set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return SetCache object + */ + RSetCacheReactive getSetCache(String name, Codec codec); + + /** + * Returns transactional map-based cache instance by name. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String)}.

+ * + * @param type of key + * @param type of value + * @param name - name of object + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name); + + /** + * Returns transactional map-based cache instance by name + * using provided codec for both cache keys and values. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}. + * + * @param type of key + * @param type of value + * @param name - object name + * @param codec - codec for keys and values + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name, Codec codec); + + /** + * Commits all changes made on this transaction. + */ + Publisher commit(); + + /** + * Rollback all changes made on this transaction. + */ + Publisher rollback(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index b8d75ded4..42803b932 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -501,6 +501,14 @@ public interface RedissonReactiveClient { */ RScriptReactive getScript(); + /** + * Creates transaction with READ_COMMITTED isolation level. + * + * @param options - transaction configuration + * @return Transaction object + */ + RTransactionReactive createTransaction(TransactionOptions options); + /** * Return batch object which executes group of * command in pipeline. diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java index eead378f9..e359e68d4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java @@ -35,9 +35,14 @@ public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive impl private final RAtomicDoubleAsync instance; public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonAtomicDouble(commandExecutor, name); + this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name)); } + + public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAndGet(final double delta) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java index 21ae86ef1..351df6f21 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java @@ -35,10 +35,14 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem private final RAtomicLongAsync instance; public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonAtomicLong(commandExecutor, name); + this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name)); } + public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAndGet(final long delta) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java index 32ede4516..aee483fb9 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java @@ -42,12 +42,12 @@ abstract class RedissonBaseMultimapReactive extends RedissonExpirableReact private final RMultimap instance; public RedissonBaseMultimapReactive(RMultimap instance, CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); + super(commandExecutor, name, instance); this.instance = instance; } public RedissonBaseMultimapReactive(RMultimap instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); + super(codec, commandExecutor, name, instance); this.instance = instance; } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java index 48e14e936..1a63e53ef 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java @@ -19,6 +19,7 @@ import java.util.BitSet; import org.reactivestreams.Publisher; import org.redisson.RedissonBitSet; +import org.redisson.api.RBitSetAsync; import org.redisson.api.RBitSetReactive; import org.redisson.api.RFuture; import org.redisson.client.codec.BitSetCodec; @@ -35,11 +36,15 @@ import reactor.rx.Streams; */ public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive { - private final RedissonBitSet instance; + private final RBitSetAsync instance; public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - this.instance = new RedissonBitSet(connectionManager, name); + this(connectionManager, name, new RedissonBitSet(connectionManager, name)); + } + + public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } public Publisher get(final long bitIndex) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java index 20b2f2d45..22eeb6b5d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java @@ -38,13 +38,21 @@ public class RedissonBucketReactive extends RedissonExpirableReactive impleme private final RBucketAsync instance; public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - instance = new RedissonBucket(connectionManager, name); + this(connectionManager, name, new RedissonBucket(connectionManager, name)); + } + + public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - super(codec, connectionManager, name); - instance = new RedissonBucket(codec, connectionManager, name); + this(codec, connectionManager, name, new RedissonBucket(codec, connectionManager, name)); + } + + public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { + super(codec, connectionManager, name, instance); + this.instance = instance; } @Override @@ -56,6 +64,16 @@ public class RedissonBucketReactive extends RedissonExpirableReactive impleme } }); } + + @Override + public Publisher getAndDelete() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndDeleteAsync(); + } + }); + } @Override public Publisher set(final V value) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java index 5e3fcf3c6..05d96898c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java @@ -19,12 +19,14 @@ import java.util.Date; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +import org.redisson.api.RExpirableAsync; import org.redisson.api.RExpirableReactive; +import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.Supplier; + /** * * @author Nikita Koksharov @@ -32,22 +34,32 @@ import org.redisson.command.CommandReactiveExecutor; */ abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { - RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { + super(connectionManager, name, instance); } - RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { + super(codec, connectionManager, name, instance); } @Override - public Publisher expire(long timeToLive, TimeUnit timeUnit) { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive)); + public Publisher expire(final long timeToLive, final TimeUnit timeUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAsync(timeToLive, timeUnit); + } + }); } @Override - public Publisher expireAt(long timestamp) { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, getName(), timestamp); + public Publisher expireAt(final long timestamp) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAtAsync(timestamp); + } + }); } @Override @@ -57,12 +69,22 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen @Override public Publisher clearExpire() { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.clearExpireAsync(); + } + }); } @Override public Publisher remainTimeToLive() { - return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.PTTL, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.remainTimeToLiveAsync(); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java index 4ee717b9d..e190f8e18 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java @@ -21,11 +21,16 @@ import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; +import org.redisson.RedissonHyperLogLog; +import org.redisson.api.RFuture; +import org.redisson.api.RHyperLogLogAsync; import org.redisson.api.RHyperLogLogReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.Supplier; + /** * * @author Nikita Koksharov @@ -34,46 +39,66 @@ import org.redisson.command.CommandReactiveExecutor; */ public class RedissonHyperLogLogReactive extends RedissonExpirableReactive implements RHyperLogLogReactive { + private final RHyperLogLogAsync instance; + public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); + super(commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); + this.instance = (RHyperLogLogAsync) super.instance; } - + public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); + super(codec, commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); + this.instance = (RHyperLogLogAsync) super.instance; } @Override - public Publisher add(V obj) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), encode(obj)); + public Publisher add(final V obj) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(obj); + } + }); } @Override - public Publisher addAll(Collection objects) { - List args = new ArrayList(objects.size() + 1); - args.add(getName()); - encode(args, objects); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), args.toArray()); + public Publisher addAll(final Collection objects) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(objects); + } + }); } @Override public Publisher count() { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countAsync(); + } + }); } @Override - public Publisher countWith(String... otherLogNames) { - List args = new ArrayList(otherLogNames.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, args.toArray()); + public Publisher countWith(final String... otherLogNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countWithAsync(otherLogNames); + } + }); } @Override - public Publisher mergeWith(String... otherLogNames) { - List args = new ArrayList(otherLogNames.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFMERGE, args.toArray()); + public Publisher mergeWith(final String... otherLogNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.mergeWithAsync(otherLogNames); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 16f61b091..08dfe44b4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -18,6 +18,7 @@ package org.redisson.reactive; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -25,6 +26,7 @@ import org.reactivestreams.Subscription; import org.redisson.RedissonKeys; import org.redisson.api.RFuture; import org.redisson.api.RKeysReactive; +import org.redisson.api.RType; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; @@ -220,4 +222,144 @@ public class RedissonKeysReactive implements RKeysReactive { }); } + @Override + public Publisher move(final String name, final int database) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.moveAsync(name, database); + } + }); + } + + @Override + public Publisher migrate(final String name, final String host, final int port, final int database, final long timeout) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.migrateAsync(name, host, port, database, timeout); + } + }); + } + + @Override + public Publisher copy(final String name, final String host, final int port, final int database, final long timeout) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.copyAsync(name, host, port, database, timeout); + } + }); + } + + @Override + public Publisher expire(final String name, final long timeToLive, final TimeUnit timeUnit) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAsync(name, timeToLive, timeUnit); + } + }); + } + + @Override + public Publisher expireAt(final String name, final long timestamp) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAtAsync(name, timestamp); + } + }); + } + + @Override + public Publisher clearExpire(final String name) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.clearExpireAsync(name); + } + }); + } + + @Override + public Publisher renamenx(final String oldName, final String newName) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renamenxAsync(oldName, newName); + } + }); + } + + @Override + public Publisher rename(final String currentName, final String newName) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renameAsync(currentName, newName); + } + }); + } + + @Override + public Publisher remainTimeToLive(final String name) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.remainTimeToLiveAsync(name); + } + }); + } + + @Override + public Publisher touch(final String... names) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.touchAsync(names); + } + }); + } + + @Override + public Publisher countExists(final String... names) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countExistsAsync(names); + } + }); + } + + @Override + public Publisher getType(final String key) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getTypeAsync(key); + } + }); + } + + @Override + public Publisher unlink(final String... keys) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.unlinkAsync(keys); + } + }); + } + + @Override + public Publisher count() { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countAsync(); + } + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index fb550d25e..20674ad5d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -29,6 +29,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.RedissonList; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import org.redisson.api.RListReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -51,16 +52,16 @@ import reactor.rx.subscription.ReactiveSubscription; */ public class RedissonListReactive extends RedissonExpirableReactive implements RListReactive { - private final RedissonList instance; + private final RListAsync instance; public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonList(commandExecutor, name, null); + super(commandExecutor, name, new RedissonList(commandExecutor, name, null)); + this.instance = (RListAsync) super.instance; } public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonList(codec, commandExecutor, name, null); + super(codec, commandExecutor, name, new RedissonList(codec, commandExecutor, name, null)); + this.instance = (RListAsync) super.instance; } @Override @@ -299,7 +300,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return reactive(new Supplier>() { @Override public RFuture get() { - return instance.indexOfAsync(o, new LongReplayConvertor()); + return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor()); } }); } @@ -309,7 +310,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return reactive(new Supplier>() { @Override public RFuture get() { - return instance.lastIndexOfAsync(o, new LongReplayConvertor()); + return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor()); } }); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java index 02f3a6fa6..32c2ccc5c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java @@ -22,7 +22,6 @@ import org.redisson.RedissonLock; import org.redisson.api.RFuture; import org.redisson.api.RLockAsync; import org.redisson.api.RLockReactive; -import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; @@ -37,12 +36,12 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R private final RLockAsync instance; public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - instance = createLock(connectionManager, name); + this(connectionManager, name, new RedissonLock(connectionManager, name)); } - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return new RedissonLock(commandExecutor, name); + public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 7c28fdb21..a96493683 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -64,13 +64,21 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im private final RMapCacheAsync mapCache; public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(commandExecutor, name); - this.mapCache = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options); + this(commandExecutor, name, options, new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options)); } + public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapCacheAsync mapCache) { + super(commandExecutor, name, mapCache); + this.mapCache = mapCache; + } + public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(codec, commandExecutor, name); - this.mapCache = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options); + this(codec, commandExecutor, name, options, new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options)); + } + + public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapCacheAsync mapCache) { + super(codec, commandExecutor, name, mapCache); + this.mapCache = mapCache; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index f4f2a8dbb..72ee38a5c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; import java.util.Map.Entry; @@ -55,13 +54,21 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme private final RMapAsync instance; public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, options); + this(commandExecutor, name, options, new RedissonMap(commandExecutor, name, null, options)); + } + + public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(codec, commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, options); + this(codec, commandExecutor, name, options, new RedissonMap(codec, commandExecutor, name, null, options)); + } + + public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 69180d102..1604b52f4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -20,10 +20,10 @@ import java.util.Collection; import org.reactivestreams.Publisher; import org.redisson.RedissonReference; +import org.redisson.api.RExpirableAsync; import org.redisson.api.RFuture; import org.redisson.api.RObjectReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; import org.redisson.misc.RedissonObjectFactory; @@ -43,19 +43,21 @@ abstract class RedissonObjectReactive implements RObjectReactive { final CommandReactiveExecutor commandExecutor; private final String name; final Codec codec; + protected RExpirableAsync instance; - public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; + this.instance = instance; } public Publisher reactive(Supplier> supplier) { return commandExecutor.reactive(supplier); } - public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance); } protected Stream newSucceeded(V result) { @@ -124,33 +126,63 @@ abstract class RedissonObjectReactive implements RObjectReactive { } @Override - public Publisher rename(String newName) { - return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName); + public Publisher rename(final String newName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renameAsync(newName); + } + }); } @Override - public Publisher migrate(String host, int port, int database) { - return commandExecutor.writeReactive(getName(), RedisCommands.MIGRATE, host, port, getName(), database); + public Publisher migrate(final String host, final int port, final int database, final long timeout) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.migrateAsync(host, port, database, timeout); + } + }); } @Override - public Publisher move(int database) { - return commandExecutor.writeReactive(getName(), RedisCommands.MOVE, getName(), database); + public Publisher move(final int database) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.moveAsync(database); + } + }); } @Override - public Publisher renamenx(String newName) { - return commandExecutor.writeReactive(getName(), RedisCommands.RENAMENX, getName(), newName); + public Publisher renamenx(final String newName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renamenxAsync(newName); + } + }); } @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.deleteAsync(); + } + }); } @Override public Publisher isExists() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.EXISTS, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.isExistsAsync(); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java index f6047026e..55d22101a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java @@ -40,8 +40,8 @@ public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableR private final RPermitExpirableSemaphoreAsync instance; public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name); - instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub); + super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub)); + instance = (RPermitExpirableSemaphoreAsync) super.instance; } protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java index cdb94b615..80d63d706 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -16,11 +16,9 @@ package org.redisson.reactive; import org.redisson.RedissonReadWriteLock; -import org.redisson.api.RLockAsync; import org.redisson.api.RLockReactive; import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLockReactive; -import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandReactiveExecutor; /** @@ -33,28 +31,18 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp private final RReadWriteLock instance; public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - this.instance = new RedissonReadWriteLock(commandExecutor, name); + super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name)); + this.instance = (RReadWriteLock) super.instance; } @Override public RLockReactive readLock() { - return new RedissonLockReactive(commandExecutor, getName()) { - @Override - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return instance.readLock(); - } - }; + return new RedissonLockReactive(commandExecutor, getName(), instance.readLock()); } @Override public RLockReactive writeLock() { - return new RedissonLockReactive(commandExecutor, getName()) { - @Override - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return instance.writeLock(); - } - }; + return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock()); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 211446bab..aa54a14cc 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; @@ -47,13 +46,21 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv private final RScoredSortedSetAsync instance; public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonScoredSortedSet(commandExecutor, name, null); + this(commandExecutor, name, new RedissonScoredSortedSet(commandExecutor, name, null)); } + public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonScoredSortedSet(codec, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonScoredSortedSet(codec, commandExecutor, name, null)); + } + + public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java index cf88a2489..473819eaa 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java @@ -37,8 +37,8 @@ public class RedissonSemaphoreReactive extends RedissonExpirableReactive impleme private final RSemaphoreAsync instance; public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name); - instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub); + super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub)); + instance = (RSemaphoreAsync) super.instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 96d328a0b..8ea1ffb0b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -24,7 +23,9 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonSetCache; +import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RSetCacheAsync; import org.redisson.api.RSetCacheReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -59,18 +60,27 @@ import reactor.fn.Supplier; */ public class RedissonSetCacheReactive extends RedissonExpirableReactive implements RSetCacheReactive { - private final RedissonSetCache instance; + private final RSetCacheAsync instance; public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); + this(commandExecutor, name, new RedissonSetCache(evictionScheduler, commandExecutor, name, null)); + } + + public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null)); } + public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; + } + + @Override public Publisher size() { return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); @@ -90,7 +100,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple return reactive(new Supplier>>() { @Override public RFuture> get() { - return instance.scanIteratorAsync(getName(), client, startPos, null); + return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null); } }); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 4a9c621af..12d353761 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -25,6 +24,7 @@ import java.util.Set; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RSetAsync; import org.redisson.api.RSetReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -45,17 +45,26 @@ import reactor.fn.Supplier; */ public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { - private final RedissonSet instance; + private final RSetAsync instance; public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonSet(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null); + this(commandExecutor, name, new RedissonSet(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null)); + } + + public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name, RSetAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonSet(codec, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonSet(codec, commandExecutor, name, null)); } + + public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAll(Publisher c) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java new file mode 100644 index 000000000..f234ebc3a --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -0,0 +1,119 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RFuture; +import org.redisson.api.RMapCacheReactive; +import org.redisson.api.RMapReactive; +import org.redisson.api.RSetCacheReactive; +import org.redisson.api.RSetReactive; +import org.redisson.api.RTransaction; +import org.redisson.api.RTransactionReactive; +import org.redisson.api.TransactionOptions; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.transaction.RedissonTransaction; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransactionReactive implements RTransactionReactive { + + private final RTransaction transaction; + private final CommandReactiveExecutor executorService; + + public RedissonTransactionReactive(CommandReactiveExecutor executorService, TransactionOptions options) { + this.transaction = new RedissonTransaction(executorService, options); + this.executorService = executorService; + } + + @Override + public RBucketReactive getBucket(String name) { + return new RedissonBucketReactive(executorService, name, transaction.getBucket(name)); + } + + @Override + public RBucketReactive getBucket(String name, Codec codec) { + return new RedissonBucketReactive(codec, executorService, name, transaction.getBucket(name, codec)); + } + + @Override + public RMapReactive getMap(String name) { + return new RedissonMapReactive(executorService, name, null, transaction.getMap(name)); + } + + @Override + public RMapReactive getMap(String name, Codec codec) { + return new RedissonMapReactive(codec, executorService, name, null, transaction.getMap(name, codec)); + } + + @Override + public RMapCacheReactive getMapCache(String name, Codec codec) { + return new RedissonMapCacheReactive(codec, executorService, name, null, transaction.getMapCache(name, codec)); + } + + @Override + public RMapCacheReactive getMapCache(String name) { + return new RedissonMapCacheReactive(executorService, name, null, transaction.getMapCache(name)); + } + + @Override + public RSetReactive getSet(String name) { + return new RedissonSetReactive(executorService, name, transaction.getSet(name)); + } + + @Override + public RSetReactive getSet(String name, Codec codec) { + return new RedissonSetReactive(codec, executorService, name, transaction.getSet(name, codec)); + } + + @Override + public RSetCacheReactive getSetCache(String name) { + return new RedissonSetCacheReactive(executorService, name, transaction.getSetCache(name)); + } + + @Override + public RSetCacheReactive getSetCache(String name, Codec codec) { + return new RedissonSetCacheReactive(codec, executorService, name, transaction.getSetCache(name, codec)); + } + + @Override + public Publisher commit() { + return new NettyFuturePublisher(new Supplier>() { + @Override + public RFuture get() { + return transaction.commitAsync(); + } + }); + } + + @Override + public Publisher rollback() { + return new NettyFuturePublisher(new Supplier>() { + @Override + public RFuture get() { + return transaction.rollbackAsync(); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index df9efb475..2d82ec049 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -510,7 +510,7 @@ public class BaseTransactionalMap { executeLocked(result, new Runnable() { @Override public void run() { - AtomicLong counter = new AtomicLong(); + final AtomicLong counter = new AtomicLong(); List keyList = Arrays.asList(keys); for (Iterator iterator = keyList.iterator(); iterator.hasNext();) { K key = iterator.next(); diff --git a/redisson/src/main/java/org/redisson/transaction/HashKey.java b/redisson/src/main/java/org/redisson/transaction/HashKey.java index 97be23502..1ec9a2921 100644 --- a/redisson/src/main/java/org/redisson/transaction/HashKey.java +++ b/redisson/src/main/java/org/redisson/transaction/HashKey.java @@ -1,3 +1,18 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.transaction; import org.redisson.client.codec.Codec; diff --git a/redisson/src/main/java/org/redisson/transaction/HashValue.java b/redisson/src/main/java/org/redisson/transaction/HashValue.java index 7977a2de2..0406446ad 100644 --- a/redisson/src/main/java/org/redisson/transaction/HashValue.java +++ b/redisson/src/main/java/org/redisson/transaction/HashValue.java @@ -1,3 +1,18 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.transaction; import java.util.ArrayList; diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 8fd79fdd5..0f82016b9 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -176,13 +175,14 @@ public class RedissonTransaction implements RTransaction { checkTimeout(); - CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); + final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); for (TransactionalOperation transactionalOperation : operations) { + System.out.println("transactionalOperation " + transactionalOperation); transactionalOperation.commit(transactionExecutor); } - String id = generateId(); - RPromise result = new RedissonPromise(); + final String id = generateId(); + final RPromise result = new RedissonPromise(); RFuture> future = disableLocalCacheAsync(id); future.addListener(new FutureListener>() { @Override @@ -192,7 +192,7 @@ public class RedissonTransaction implements RTransaction { return; } - Map hashes = future.getNow(); + final Map hashes = future.getNow(); try { checkTimeout(); } catch (TransactionTimeoutException e) { @@ -420,13 +420,13 @@ public class RedissonTransaction implements RTransaction { return hashes; } - private RFuture> disableLocalCacheAsync(String requestId) { + private RFuture> disableLocalCacheAsync(final String requestId) { if (localCaches.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); } - RPromise> result = new RedissonPromise>(); - Map hashes = new HashMap(localCaches.size()); + final RPromise> result = new RedissonPromise>(); + final Map hashes = new HashMap(localCaches.size()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { @@ -459,14 +459,14 @@ public class RedissonTransaction implements RTransaction { return; } - CountableListener> listener = + final CountableListener> listener = new CountableListener>(result, hashes); listener.setCounter(hashes.size()); RPromise subscriptionFuture = new RedissonPromise(); - CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); + final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); subscribedFutures.setCounter(hashes.size()); - List> topics = new ArrayList>(); + final List> topics = new ArrayList>(); for (final Entry entry : hashes.entrySet()) { final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, @@ -588,7 +588,7 @@ public class RedissonTransaction implements RTransaction { transactionalOperation.rollback(executorService); } - RPromise result = new RedissonPromise(); + final RPromise result = new RedissonPromise(); RFuture future = executorService.executeAsync(BatchOptions.defaults()); future.addListener(new FutureListener() { @Override diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java new file mode 100644 index 000000000..1ba055b6c --- /dev/null +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java @@ -0,0 +1,148 @@ +package org.redisson.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.BaseReactiveTest; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RTransactionReactive; +import org.redisson.api.TransactionOptions; + +public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest { + + @Test + public void testTimeout() throws InterruptedException { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(3, TimeUnit.SECONDS)); + RBucketReactive bucket = transaction.getBucket("test"); + sync(bucket.set("234")); + + Thread.sleep(3000); + + try { + sync(transaction.commit()); + Assert.fail(); + } catch (TransactionException e) { + // skip + } + + Thread.sleep(1000); + + assertThat(sync(b.get())).isEqualTo("123"); + } + + @Test + public void testSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + sync(bucket.set("234")); + assertThat(sync(bucket.get())).isEqualTo("234"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("234"); + } + + @Test + public void testGetAndSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.getAndSet("0"))).isEqualTo("123"); + assertThat(sync(bucket.get())).isEqualTo("0"); + assertThat(sync(bucket.getAndSet("324"))).isEqualTo("0"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("324"); + } + + @Test + public void testCompareAndSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.compareAndSet("0", "434"))).isFalse(); + assertThat(sync(bucket.get())).isEqualTo("123"); + assertThat(sync(bucket.compareAndSet("123", "232"))).isTrue(); + assertThat(sync(bucket.get())).isEqualTo("232"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("232"); + } + + @Test + public void testTrySet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.trySet("0"))).isFalse(); + assertThat(sync(bucket.delete())).isTrue(); + assertThat(sync(bucket.trySet("324"))).isTrue(); + assertThat(sync(bucket.trySet("43"))).isFalse(); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("324"); + } + + @Test + public void testGetAndRemove() { + RBucketReactive m = redisson.getBucket("test"); + sync(m.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive set = transaction.getBucket("test"); + assertThat(sync(set.get())).isEqualTo("123"); + assertThat(sync(set.size())).isEqualTo(5); + assertThat(sync(set.getAndDelete())).isEqualTo("123"); + assertThat(sync(set.size())).isEqualTo(0); + assertThat(sync(set.get())).isNull(); + assertThat(sync(set.getAndDelete())).isNull(); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(0); + assertThat(sync(m.get())).isNull(); + } + + @Test + public void testRollback() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("1234")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.get())).isEqualTo("1234"); + assertThat(sync(bucket.getAndDelete())).isEqualTo("1234"); + + assertThat(sync(b.get())).isEqualTo("1234"); + + sync(transaction.rollback()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + + assertThat(sync(b.get())).isEqualTo("1234"); + } + + +}