From c3655e5ef41d46b1c91181e32ed5d5e68026361c Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Apr 2018 18:27:34 +0300 Subject: [PATCH] RedissonReactive.createTransaction method added. #1372 --- .../org/redisson/RedissonHyperLogLog.java | 4 +- .../java/org/redisson/RedissonReactive.java | 8 + .../main/java/org/redisson/RedissonSet.java | 6 + .../java/org/redisson/RedissonSetCache.java | 1 + .../main/java/org/redisson/RedissonTopic.java | 4 +- .../main/java/org/redisson/ScanIterator.java | 3 + .../org/redisson/api/RBucketReactive.java | 2 + .../redisson/api/RHyperLogLogReactive.java | 6 + .../java/org/redisson/api/RKeysReactive.java | 137 +++++++++++++++ .../org/redisson/api/RObjectReactive.java | 3 +- .../redisson/api/RTransactionReactive.java | 157 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 8 + .../RedissonAtomicDoubleReactive.java | 9 +- .../reactive/RedissonAtomicLongReactive.java | 8 +- .../RedissonBaseMultimapReactive.java | 4 +- .../reactive/RedissonBitSetReactive.java | 11 +- .../reactive/RedissonBucketReactive.java | 26 ++- .../reactive/RedissonExpirableReactive.java | 46 +++-- .../reactive/RedissonHyperLogLogReactive.java | 67 +++++--- .../reactive/RedissonKeysReactive.java | 142 ++++++++++++++++ .../reactive/RedissonListReactive.java | 15 +- .../reactive/RedissonLockReactive.java | 9 +- .../reactive/RedissonMapCacheReactive.java | 16 +- .../reactive/RedissonMapReactive.java | 17 +- .../reactive/RedissonObjectReactive.java | 60 +++++-- ...issonPermitExpirableSemaphoreReactive.java | 4 +- .../RedissonReadWriteLockReactive.java | 20 +-- .../RedissonScoredSortedSetReactive.java | 17 +- .../reactive/RedissonSemaphoreReactive.java | 4 +- .../reactive/RedissonSetCacheReactive.java | 24 ++- .../reactive/RedissonSetReactive.java | 21 ++- .../reactive/RedissonTransactionReactive.java | 119 +++++++++++++ .../transaction/BaseTransactionalMap.java | 2 +- .../org/redisson/transaction/HashKey.java | 15 ++ .../org/redisson/transaction/HashValue.java | 15 ++ .../transaction/RedissonTransaction.java | 26 +-- ...dissonTransactionalBucketReactiveTest.java | 148 +++++++++++++++++ 37 files changed, 1046 insertions(+), 138 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RTransactionReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java create mode 100644 redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java diff --git a/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java b/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java index 0cac38234..f28ba0fec 100644 --- a/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java +++ b/redisson/src/main/java/org/redisson/RedissonHyperLogLog.java @@ -34,11 +34,11 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperLogLog { - protected RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) { + public RedissonHyperLogLog(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } - protected RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + public RedissonHyperLogLog(Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index b2129a387..62a91519f 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -52,7 +52,9 @@ import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; +import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; +import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.ReferenceCodecProvider; @@ -88,6 +90,7 @@ import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonTopicReactive; +import org.redisson.reactive.RedissonTransactionReactive; /** * Main infrastructure class allows to get access @@ -411,5 +414,10 @@ public class RedissonReactive implements RedissonReactiveClient { public RMapReactive getMap(String name, Codec codec, MapOptions options) { return new RedissonMapReactive(codec, commandExecutor, name, options); } + + @Override + public RTransactionReactive createTransaction(TransactionOptions options) { + return new RedissonTransactionReactive(commandExecutor, options); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index d2fa8cd32..07fe4882b 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -565,5 +565,11 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt String lockName = getLockName(value); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, + String pattern) { + throw new UnsupportedOperationException(); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index a4bdbe431..45c107455 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -128,6 +128,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return get(f); } + @Override public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(startPos); diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 93fd84f75..2f443d0d6 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -107,9 +107,9 @@ public class RedissonTopic implements RTopic { @Override public RFuture addListenerAsync(final MessageListener listener) { - PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); + final PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); RFuture future = subscribeService.subscribe(codec, name, pubSubListener); - RPromise result = new RedissonPromise(); + final RPromise result = new RedissonPromise(); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java index 9e3f7342f..3531dda26 100644 --- a/redisson/src/main/java/org/redisson/ScanIterator.java +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -28,6 +29,8 @@ public interface ScanIterator { ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern); + RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern); + boolean remove(Object value); } diff --git a/redisson/src/main/java/org/redisson/api/RBucketReactive.java b/redisson/src/main/java/org/redisson/api/RBucketReactive.java index 0503a611d..6879be489 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBucketReactive.java @@ -45,6 +45,8 @@ public interface RBucketReactive extends RExpirableReactive { Publisher getAndSet(V newValue); Publisher get(); + + Publisher getAndDelete(); Publisher set(V value); diff --git a/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java b/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java index f3d9fe201..2e04082ea 100644 --- a/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java +++ b/redisson/src/main/java/org/redisson/api/RHyperLogLogReactive.java @@ -19,6 +19,12 @@ import java.util.Collection; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + * @param + */ public interface RHyperLogLogReactive extends RExpirableReactive { Publisher add(V obj); diff --git a/redisson/src/main/java/org/redisson/api/RKeysReactive.java b/redisson/src/main/java/org/redisson/api/RKeysReactive.java index 744eef68b..de51ee2e7 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysReactive.java +++ b/redisson/src/main/java/org/redisson/api/RKeysReactive.java @@ -16,11 +16,130 @@ package org.redisson.api; import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + */ public interface RKeysReactive { + /** + * Move object to another database + * + * @param name of object + * @param database - Redis database number + * @return true if key was moved else false + */ + Publisher move(String name, int database); + + /** + * Transfer object from source Redis instance to destination Redis instance + * + * @param name of object + * @param host - destination host + * @param port - destination port + * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds + */ + Publisher migrate(String name, String host, int port, int database, long timeout); + + /** + * Copy object from source Redis instance to destination Redis instance + * + * @param name of object + * @param host - destination host + * @param port - destination port + * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds + */ + Publisher copy(String name, String host, int port, int database, long timeout); + + /** + * Set a timeout for object. After the timeout has expired, + * the key will automatically be deleted. + * + * @param name of object + * @param timeToLive - timeout before object will be deleted + * @param timeUnit - timeout time unit + * @return true if the timeout was set and false if not + */ + Publisher expire(String name, long timeToLive, TimeUnit timeUnit); + + /** + * Set an expire date for object. When expire date comes + * the key will automatically be deleted. + * + * @param name of object + * @param timestamp - expire date in milliseconds (Unix timestamp) + * @return true if the timeout was set and false if not + */ + Publisher expireAt(String name, long timestamp); + + /** + * Clear an expire timeout or expire date for object. + * + * @param name of object + * @return true if timeout was removed + * false if object does not exist or does not have an associated timeout + */ + Publisher clearExpire(String name); + + /** + * Rename object with oldName to newName + * only if new key is not exists + * + * @param oldName - old name of object + * @param newName - new name of object + * @return true if object has been renamed successfully and false otherwise + */ + Publisher renamenx(String oldName, String newName); + + /** + * Rename current object key to newName + * + * @param currentName - current name of object + * @param newName - new name of object + */ + Publisher rename(String currentName, String newName); + + /** + * Remaining time to live of Redisson object that has a timeout + * + * @param name of key + * @return time in milliseconds + * -2 if the key does not exist. + * -1 if the key exists but has no associated expire. + */ + Publisher remainTimeToLive(String name); + + /** + * Update the last access time of an object. + * + * @param names of keys + * @return count of objects were touched + */ + Publisher touch(String... names); + + /** + * Checks if provided keys exist + * + * @param names of keys + * @return amount of existing keys + */ + Publisher countExists(String... names); + + /** + * Get Redis object type by key + * + * @param key - name of key + * @return type of key + */ + Publisher getType(String key); + /** * Load keys in incrementally iterate mode. * @@ -105,6 +224,24 @@ public interface RKeysReactive { */ Publisher delete(String ... keys); + /** + * Delete multiple objects by name. + * Actual removal will happen later asynchronously. + *

+ * Requires Redis 4.0+ + * + * @param keys of objects + * @return number of removed keys + */ + Publisher unlink(String ... keys); + + /** + * Returns the number of keys in the currently-selected database + * + * @return count of keys + */ + Publisher count(); + /** * Delete all the keys of the currently selected database * diff --git a/redisson/src/main/java/org/redisson/api/RObjectReactive.java b/redisson/src/main/java/org/redisson/api/RObjectReactive.java index 634092686..088f3527e 100644 --- a/redisson/src/main/java/org/redisson/api/RObjectReactive.java +++ b/redisson/src/main/java/org/redisson/api/RObjectReactive.java @@ -36,9 +36,10 @@ public interface RObjectReactive { * @param host - destination host * @param port - destination port * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds * @return void */ - Publisher migrate(String host, int port, int database); + Publisher migrate(String host, int port, int database, long timeout); /** * Move object to another database in mode diff --git a/redisson/src/main/java/org/redisson/api/RTransactionReactive.java b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java new file mode 100644 index 000000000..db6418687 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java @@ -0,0 +1,157 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; + +/** + * Transaction object allows to execute transactions over Redisson objects. + * Uses locks for write operations and maintains data modification operations list till the commit/rollback operation. + *

+ * Transaction isolation level: READ_COMMITTED + * + * @author Nikita Koksharov + * + */ +public interface RTransactionReactive { + + /** + * Returns transactional object holder instance by name. + * + * @param type of value + * @param name - name of object + * @return Bucket object + */ + RBucketReactive getBucket(String name); + + /** + * Returns transactional object holder instance by name + * using provided codec for object. + * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return Bucket object + */ + RBucketReactive getBucket(String name, Codec codec); + + /** + * Returns transactional map instance by name. + * + * @param type of key + * @param type of value + * @param name - name of object + * @return Map object + */ + RMapReactive getMap(String name); + + /** + * Returns transactional map instance by name + * using provided codec for both map keys and values. + * + * @param type of key + * @param type of value + * @param name - name of object + * @param codec - codec for keys and values + * @return Map object + */ + RMapReactive getMap(String name, Codec codec); + + /** + * Returns transactional set instance by name. + * + * @param type of value + * @param name - name of object + * @return Set object + */ + RSetReactive getSet(String name); + + /** + * Returns transactional set instance by name + * using provided codec for set objects. + * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return Set object + */ + RSetReactive getSet(String name, Codec codec); + + /** + * Returns transactional set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String)}.

+ * + * @param type of value + * @param name - name of object + * @return SetCache object + */ + RSetCacheReactive getSetCache(String name); + + /** + * Returns transactional set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param type of value + * @param name - name of object + * @param codec - codec for values + * @return SetCache object + */ + RSetCacheReactive getSetCache(String name, Codec codec); + + /** + * Returns transactional map-based cache instance by name. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String)}.

+ * + * @param type of key + * @param type of value + * @param name - name of object + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name); + + /** + * Returns transactional map-based cache instance by name + * using provided codec for both cache keys and values. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}. + * + * @param type of key + * @param type of value + * @param name - object name + * @param codec - codec for keys and values + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name, Codec codec); + + /** + * Commits all changes made on this transaction. + */ + Publisher commit(); + + /** + * Rollback all changes made on this transaction. + */ + Publisher rollback(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index b8d75ded4..42803b932 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -501,6 +501,14 @@ public interface RedissonReactiveClient { */ RScriptReactive getScript(); + /** + * Creates transaction with READ_COMMITTED isolation level. + * + * @param options - transaction configuration + * @return Transaction object + */ + RTransactionReactive createTransaction(TransactionOptions options); + /** * Return batch object which executes group of * command in pipeline. diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java index eead378f9..e359e68d4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java @@ -35,9 +35,14 @@ public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive impl private final RAtomicDoubleAsync instance; public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonAtomicDouble(commandExecutor, name); + this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name)); } + + public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAndGet(final double delta) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java index 21ae86ef1..351df6f21 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java @@ -35,10 +35,14 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem private final RAtomicLongAsync instance; public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonAtomicLong(commandExecutor, name); + this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name)); } + public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAndGet(final long delta) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java index 32ede4516..aee483fb9 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java @@ -42,12 +42,12 @@ abstract class RedissonBaseMultimapReactive extends RedissonExpirableReact private final RMultimap instance; public RedissonBaseMultimapReactive(RMultimap instance, CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); + super(commandExecutor, name, instance); this.instance = instance; } public RedissonBaseMultimapReactive(RMultimap instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); + super(codec, commandExecutor, name, instance); this.instance = instance; } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java index 48e14e936..1a63e53ef 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java @@ -19,6 +19,7 @@ import java.util.BitSet; import org.reactivestreams.Publisher; import org.redisson.RedissonBitSet; +import org.redisson.api.RBitSetAsync; import org.redisson.api.RBitSetReactive; import org.redisson.api.RFuture; import org.redisson.client.codec.BitSetCodec; @@ -35,11 +36,15 @@ import reactor.rx.Streams; */ public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive { - private final RedissonBitSet instance; + private final RBitSetAsync instance; public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - this.instance = new RedissonBitSet(connectionManager, name); + this(connectionManager, name, new RedissonBitSet(connectionManager, name)); + } + + public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } public Publisher get(final long bitIndex) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java index 20b2f2d45..22eeb6b5d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java @@ -38,13 +38,21 @@ public class RedissonBucketReactive extends RedissonExpirableReactive impleme private final RBucketAsync instance; public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - instance = new RedissonBucket(connectionManager, name); + this(connectionManager, name, new RedissonBucket(connectionManager, name)); + } + + public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - super(codec, connectionManager, name); - instance = new RedissonBucket(codec, connectionManager, name); + this(codec, connectionManager, name, new RedissonBucket(codec, connectionManager, name)); + } + + public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { + super(codec, connectionManager, name, instance); + this.instance = instance; } @Override @@ -56,6 +64,16 @@ public class RedissonBucketReactive extends RedissonExpirableReactive impleme } }); } + + @Override + public Publisher getAndDelete() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndDeleteAsync(); + } + }); + } @Override public Publisher set(final V value) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java index 5e3fcf3c6..05d96898c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java @@ -19,12 +19,14 @@ import java.util.Date; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +import org.redisson.api.RExpirableAsync; import org.redisson.api.RExpirableReactive; +import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.Supplier; + /** * * @author Nikita Koksharov @@ -32,22 +34,32 @@ import org.redisson.command.CommandReactiveExecutor; */ abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { - RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { + super(connectionManager, name, instance); } - RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { + super(codec, connectionManager, name, instance); } @Override - public Publisher expire(long timeToLive, TimeUnit timeUnit) { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive)); + public Publisher expire(final long timeToLive, final TimeUnit timeUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAsync(timeToLive, timeUnit); + } + }); } @Override - public Publisher expireAt(long timestamp) { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, getName(), timestamp); + public Publisher expireAt(final long timestamp) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAtAsync(timestamp); + } + }); } @Override @@ -57,12 +69,22 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen @Override public Publisher clearExpire() { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.clearExpireAsync(); + } + }); } @Override public Publisher remainTimeToLive() { - return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.PTTL, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.remainTimeToLiveAsync(); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java index 4ee717b9d..e190f8e18 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java @@ -21,11 +21,16 @@ import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; +import org.redisson.RedissonHyperLogLog; +import org.redisson.api.RFuture; +import org.redisson.api.RHyperLogLogAsync; import org.redisson.api.RHyperLogLogReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.Supplier; + /** * * @author Nikita Koksharov @@ -34,46 +39,66 @@ import org.redisson.command.CommandReactiveExecutor; */ public class RedissonHyperLogLogReactive extends RedissonExpirableReactive implements RHyperLogLogReactive { + private final RHyperLogLogAsync instance; + public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); + super(commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); + this.instance = (RHyperLogLogAsync) super.instance; } - + public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); + super(codec, commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); + this.instance = (RHyperLogLogAsync) super.instance; } @Override - public Publisher add(V obj) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), encode(obj)); + public Publisher add(final V obj) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(obj); + } + }); } @Override - public Publisher addAll(Collection objects) { - List args = new ArrayList(objects.size() + 1); - args.add(getName()); - encode(args, objects); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), args.toArray()); + public Publisher addAll(final Collection objects) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(objects); + } + }); } @Override public Publisher count() { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countAsync(); + } + }); } @Override - public Publisher countWith(String... otherLogNames) { - List args = new ArrayList(otherLogNames.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, args.toArray()); + public Publisher countWith(final String... otherLogNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countWithAsync(otherLogNames); + } + }); } @Override - public Publisher mergeWith(String... otherLogNames) { - List args = new ArrayList(otherLogNames.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFMERGE, args.toArray()); + public Publisher mergeWith(final String... otherLogNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.mergeWithAsync(otherLogNames); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 16f61b091..08dfe44b4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -18,6 +18,7 @@ package org.redisson.reactive; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -25,6 +26,7 @@ import org.reactivestreams.Subscription; import org.redisson.RedissonKeys; import org.redisson.api.RFuture; import org.redisson.api.RKeysReactive; +import org.redisson.api.RType; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; @@ -220,4 +222,144 @@ public class RedissonKeysReactive implements RKeysReactive { }); } + @Override + public Publisher move(final String name, final int database) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.moveAsync(name, database); + } + }); + } + + @Override + public Publisher migrate(final String name, final String host, final int port, final int database, final long timeout) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.migrateAsync(name, host, port, database, timeout); + } + }); + } + + @Override + public Publisher copy(final String name, final String host, final int port, final int database, final long timeout) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.copyAsync(name, host, port, database, timeout); + } + }); + } + + @Override + public Publisher expire(final String name, final long timeToLive, final TimeUnit timeUnit) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAsync(name, timeToLive, timeUnit); + } + }); + } + + @Override + public Publisher expireAt(final String name, final long timestamp) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAtAsync(name, timestamp); + } + }); + } + + @Override + public Publisher clearExpire(final String name) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.clearExpireAsync(name); + } + }); + } + + @Override + public Publisher renamenx(final String oldName, final String newName) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renamenxAsync(oldName, newName); + } + }); + } + + @Override + public Publisher rename(final String currentName, final String newName) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renameAsync(currentName, newName); + } + }); + } + + @Override + public Publisher remainTimeToLive(final String name) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.remainTimeToLiveAsync(name); + } + }); + } + + @Override + public Publisher touch(final String... names) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.touchAsync(names); + } + }); + } + + @Override + public Publisher countExists(final String... names) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countExistsAsync(names); + } + }); + } + + @Override + public Publisher getType(final String key) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getTypeAsync(key); + } + }); + } + + @Override + public Publisher unlink(final String... keys) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.unlinkAsync(keys); + } + }); + } + + @Override + public Publisher count() { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.countAsync(); + } + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index fb550d25e..20674ad5d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -29,6 +29,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.RedissonList; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import org.redisson.api.RListReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -51,16 +52,16 @@ import reactor.rx.subscription.ReactiveSubscription; */ public class RedissonListReactive extends RedissonExpirableReactive implements RListReactive { - private final RedissonList instance; + private final RListAsync instance; public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonList(commandExecutor, name, null); + super(commandExecutor, name, new RedissonList(commandExecutor, name, null)); + this.instance = (RListAsync) super.instance; } public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonList(codec, commandExecutor, name, null); + super(codec, commandExecutor, name, new RedissonList(codec, commandExecutor, name, null)); + this.instance = (RListAsync) super.instance; } @Override @@ -299,7 +300,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return reactive(new Supplier>() { @Override public RFuture get() { - return instance.indexOfAsync(o, new LongReplayConvertor()); + return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor()); } }); } @@ -309,7 +310,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return reactive(new Supplier>() { @Override public RFuture get() { - return instance.lastIndexOfAsync(o, new LongReplayConvertor()); + return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor()); } }); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java index 02f3a6fa6..32c2ccc5c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java @@ -22,7 +22,6 @@ import org.redisson.RedissonLock; import org.redisson.api.RFuture; import org.redisson.api.RLockAsync; import org.redisson.api.RLockReactive; -import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; @@ -37,12 +36,12 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R private final RLockAsync instance; public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); - instance = createLock(connectionManager, name); + this(connectionManager, name, new RedissonLock(connectionManager, name)); } - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return new RedissonLock(commandExecutor, name); + public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 7c28fdb21..a96493683 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -64,13 +64,21 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im private final RMapCacheAsync mapCache; public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(commandExecutor, name); - this.mapCache = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options); + this(commandExecutor, name, options, new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options)); } + public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapCacheAsync mapCache) { + super(commandExecutor, name, mapCache); + this.mapCache = mapCache; + } + public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(codec, commandExecutor, name); - this.mapCache = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options); + this(codec, commandExecutor, name, options, new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options)); + } + + public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapCacheAsync mapCache) { + super(codec, commandExecutor, name, mapCache); + this.mapCache = mapCache; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index f4f2a8dbb..72ee38a5c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; import java.util.Map.Entry; @@ -55,13 +54,21 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme private final RMapAsync instance; public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, options); + this(commandExecutor, name, options, new RedissonMap(commandExecutor, name, null, options)); + } + + public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { - super(codec, commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, options); + this(codec, commandExecutor, name, options, new RedissonMap(codec, commandExecutor, name, null, options)); + } + + public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options, RMapAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 69180d102..1604b52f4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -20,10 +20,10 @@ import java.util.Collection; import org.reactivestreams.Publisher; import org.redisson.RedissonReference; +import org.redisson.api.RExpirableAsync; import org.redisson.api.RFuture; import org.redisson.api.RObjectReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; import org.redisson.misc.RedissonObjectFactory; @@ -43,19 +43,21 @@ abstract class RedissonObjectReactive implements RObjectReactive { final CommandReactiveExecutor commandExecutor; private final String name; final Codec codec; + protected RExpirableAsync instance; - public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; + this.instance = instance; } public Publisher reactive(Supplier> supplier) { return commandExecutor.reactive(supplier); } - public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance); } protected Stream newSucceeded(V result) { @@ -124,33 +126,63 @@ abstract class RedissonObjectReactive implements RObjectReactive { } @Override - public Publisher rename(String newName) { - return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName); + public Publisher rename(final String newName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renameAsync(newName); + } + }); } @Override - public Publisher migrate(String host, int port, int database) { - return commandExecutor.writeReactive(getName(), RedisCommands.MIGRATE, host, port, getName(), database); + public Publisher migrate(final String host, final int port, final int database, final long timeout) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.migrateAsync(host, port, database, timeout); + } + }); } @Override - public Publisher move(int database) { - return commandExecutor.writeReactive(getName(), RedisCommands.MOVE, getName(), database); + public Publisher move(final int database) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.moveAsync(database); + } + }); } @Override - public Publisher renamenx(String newName) { - return commandExecutor.writeReactive(getName(), RedisCommands.RENAMENX, getName(), newName); + public Publisher renamenx(final String newName) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.renamenxAsync(newName); + } + }); } @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.deleteAsync(); + } + }); } @Override public Publisher isExists() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.EXISTS, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.isExistsAsync(); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java index f6047026e..55d22101a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java @@ -40,8 +40,8 @@ public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableR private final RPermitExpirableSemaphoreAsync instance; public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name); - instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub); + super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub)); + instance = (RPermitExpirableSemaphoreAsync) super.instance; } protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java index cdb94b615..80d63d706 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -16,11 +16,9 @@ package org.redisson.reactive; import org.redisson.RedissonReadWriteLock; -import org.redisson.api.RLockAsync; import org.redisson.api.RLockReactive; import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLockReactive; -import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandReactiveExecutor; /** @@ -33,28 +31,18 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp private final RReadWriteLock instance; public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - this.instance = new RedissonReadWriteLock(commandExecutor, name); + super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name)); + this.instance = (RReadWriteLock) super.instance; } @Override public RLockReactive readLock() { - return new RedissonLockReactive(commandExecutor, getName()) { - @Override - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return instance.readLock(); - } - }; + return new RedissonLockReactive(commandExecutor, getName(), instance.readLock()); } @Override public RLockReactive writeLock() { - return new RedissonLockReactive(commandExecutor, getName()) { - @Override - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return instance.writeLock(); - } - }; + return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock()); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 211446bab..aa54a14cc 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; @@ -47,13 +46,21 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv private final RScoredSortedSetAsync instance; public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonScoredSortedSet(commandExecutor, name, null); + this(commandExecutor, name, new RedissonScoredSortedSet(commandExecutor, name, null)); } + public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonScoredSortedSet(codec, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonScoredSortedSet(codec, commandExecutor, name, null)); + } + + public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RScoredSortedSetAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java index cf88a2489..473819eaa 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java @@ -37,8 +37,8 @@ public class RedissonSemaphoreReactive extends RedissonExpirableReactive impleme private final RSemaphoreAsync instance; public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name); - instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub); + super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub)); + instance = (RSemaphoreAsync) super.instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 96d328a0b..8ea1ffb0b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -24,7 +23,9 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonSetCache; +import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RSetCacheAsync; import org.redisson.api.RSetCacheReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -59,18 +60,27 @@ import reactor.fn.Supplier; */ public class RedissonSetCacheReactive extends RedissonExpirableReactive implements RSetCacheReactive { - private final RedissonSetCache instance; + private final RSetCacheAsync instance; public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); + this(commandExecutor, name, new RedissonSetCache(evictionScheduler, commandExecutor, name, null)); + } + + public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null)); } + public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; + } + + @Override public Publisher size() { return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); @@ -90,7 +100,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple return reactive(new Supplier>>() { @Override public RFuture> get() { - return instance.scanIteratorAsync(getName(), client, startPos, null); + return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null); } }); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 4a9c621af..12d353761 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -25,6 +24,7 @@ import java.util.Set; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RSetAsync; import org.redisson.api.RSetReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -45,17 +45,26 @@ import reactor.fn.Supplier; */ public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { - private final RedissonSet instance; + private final RSetAsync instance; public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name); - instance = new RedissonSet(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null); + this(commandExecutor, name, new RedissonSet(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null)); + } + + public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name, RSetAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; } public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); - instance = new RedissonSet(codec, commandExecutor, name, null); + this(codec, commandExecutor, name, new RedissonSet(codec, commandExecutor, name, null)); } + + public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; + } + @Override public Publisher addAll(Publisher c) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java new file mode 100644 index 000000000..f234ebc3a --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -0,0 +1,119 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RFuture; +import org.redisson.api.RMapCacheReactive; +import org.redisson.api.RMapReactive; +import org.redisson.api.RSetCacheReactive; +import org.redisson.api.RSetReactive; +import org.redisson.api.RTransaction; +import org.redisson.api.RTransactionReactive; +import org.redisson.api.TransactionOptions; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.transaction.RedissonTransaction; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransactionReactive implements RTransactionReactive { + + private final RTransaction transaction; + private final CommandReactiveExecutor executorService; + + public RedissonTransactionReactive(CommandReactiveExecutor executorService, TransactionOptions options) { + this.transaction = new RedissonTransaction(executorService, options); + this.executorService = executorService; + } + + @Override + public RBucketReactive getBucket(String name) { + return new RedissonBucketReactive(executorService, name, transaction.getBucket(name)); + } + + @Override + public RBucketReactive getBucket(String name, Codec codec) { + return new RedissonBucketReactive(codec, executorService, name, transaction.getBucket(name, codec)); + } + + @Override + public RMapReactive getMap(String name) { + return new RedissonMapReactive(executorService, name, null, transaction.getMap(name)); + } + + @Override + public RMapReactive getMap(String name, Codec codec) { + return new RedissonMapReactive(codec, executorService, name, null, transaction.getMap(name, codec)); + } + + @Override + public RMapCacheReactive getMapCache(String name, Codec codec) { + return new RedissonMapCacheReactive(codec, executorService, name, null, transaction.getMapCache(name, codec)); + } + + @Override + public RMapCacheReactive getMapCache(String name) { + return new RedissonMapCacheReactive(executorService, name, null, transaction.getMapCache(name)); + } + + @Override + public RSetReactive getSet(String name) { + return new RedissonSetReactive(executorService, name, transaction.getSet(name)); + } + + @Override + public RSetReactive getSet(String name, Codec codec) { + return new RedissonSetReactive(codec, executorService, name, transaction.getSet(name, codec)); + } + + @Override + public RSetCacheReactive getSetCache(String name) { + return new RedissonSetCacheReactive(executorService, name, transaction.getSetCache(name)); + } + + @Override + public RSetCacheReactive getSetCache(String name, Codec codec) { + return new RedissonSetCacheReactive(codec, executorService, name, transaction.getSetCache(name, codec)); + } + + @Override + public Publisher commit() { + return new NettyFuturePublisher(new Supplier>() { + @Override + public RFuture get() { + return transaction.commitAsync(); + } + }); + } + + @Override + public Publisher rollback() { + return new NettyFuturePublisher(new Supplier>() { + @Override + public RFuture get() { + return transaction.rollbackAsync(); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index df9efb475..2d82ec049 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -510,7 +510,7 @@ public class BaseTransactionalMap { executeLocked(result, new Runnable() { @Override public void run() { - AtomicLong counter = new AtomicLong(); + final AtomicLong counter = new AtomicLong(); List keyList = Arrays.asList(keys); for (Iterator iterator = keyList.iterator(); iterator.hasNext();) { K key = iterator.next(); diff --git a/redisson/src/main/java/org/redisson/transaction/HashKey.java b/redisson/src/main/java/org/redisson/transaction/HashKey.java index 97be23502..1ec9a2921 100644 --- a/redisson/src/main/java/org/redisson/transaction/HashKey.java +++ b/redisson/src/main/java/org/redisson/transaction/HashKey.java @@ -1,3 +1,18 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.transaction; import org.redisson.client.codec.Codec; diff --git a/redisson/src/main/java/org/redisson/transaction/HashValue.java b/redisson/src/main/java/org/redisson/transaction/HashValue.java index 7977a2de2..0406446ad 100644 --- a/redisson/src/main/java/org/redisson/transaction/HashValue.java +++ b/redisson/src/main/java/org/redisson/transaction/HashValue.java @@ -1,3 +1,18 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.transaction; import java.util.ArrayList; diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 8fd79fdd5..0f82016b9 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -176,13 +175,14 @@ public class RedissonTransaction implements RTransaction { checkTimeout(); - CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); + final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); for (TransactionalOperation transactionalOperation : operations) { + System.out.println("transactionalOperation " + transactionalOperation); transactionalOperation.commit(transactionExecutor); } - String id = generateId(); - RPromise result = new RedissonPromise(); + final String id = generateId(); + final RPromise result = new RedissonPromise(); RFuture> future = disableLocalCacheAsync(id); future.addListener(new FutureListener>() { @Override @@ -192,7 +192,7 @@ public class RedissonTransaction implements RTransaction { return; } - Map hashes = future.getNow(); + final Map hashes = future.getNow(); try { checkTimeout(); } catch (TransactionTimeoutException e) { @@ -420,13 +420,13 @@ public class RedissonTransaction implements RTransaction { return hashes; } - private RFuture> disableLocalCacheAsync(String requestId) { + private RFuture> disableLocalCacheAsync(final String requestId) { if (localCaches.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); } - RPromise> result = new RedissonPromise>(); - Map hashes = new HashMap(localCaches.size()); + final RPromise> result = new RedissonPromise>(); + final Map hashes = new HashMap(localCaches.size()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { @@ -459,14 +459,14 @@ public class RedissonTransaction implements RTransaction { return; } - CountableListener> listener = + final CountableListener> listener = new CountableListener>(result, hashes); listener.setCounter(hashes.size()); RPromise subscriptionFuture = new RedissonPromise(); - CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); + final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); subscribedFutures.setCounter(hashes.size()); - List> topics = new ArrayList>(); + final List> topics = new ArrayList>(); for (final Entry entry : hashes.entrySet()) { final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, @@ -588,7 +588,7 @@ public class RedissonTransaction implements RTransaction { transactionalOperation.rollback(executorService); } - RPromise result = new RedissonPromise(); + final RPromise result = new RedissonPromise(); RFuture future = executorService.executeAsync(BatchOptions.defaults()); future.addListener(new FutureListener() { @Override diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java new file mode 100644 index 000000000..1ba055b6c --- /dev/null +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketReactiveTest.java @@ -0,0 +1,148 @@ +package org.redisson.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.BaseReactiveTest; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RTransactionReactive; +import org.redisson.api.TransactionOptions; + +public class RedissonTransactionalBucketReactiveTest extends BaseReactiveTest { + + @Test + public void testTimeout() throws InterruptedException { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults().timeout(3, TimeUnit.SECONDS)); + RBucketReactive bucket = transaction.getBucket("test"); + sync(bucket.set("234")); + + Thread.sleep(3000); + + try { + sync(transaction.commit()); + Assert.fail(); + } catch (TransactionException e) { + // skip + } + + Thread.sleep(1000); + + assertThat(sync(b.get())).isEqualTo("123"); + } + + @Test + public void testSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + sync(bucket.set("234")); + assertThat(sync(bucket.get())).isEqualTo("234"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("234"); + } + + @Test + public void testGetAndSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.getAndSet("0"))).isEqualTo("123"); + assertThat(sync(bucket.get())).isEqualTo("0"); + assertThat(sync(bucket.getAndSet("324"))).isEqualTo("0"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("324"); + } + + @Test + public void testCompareAndSet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.compareAndSet("0", "434"))).isFalse(); + assertThat(sync(bucket.get())).isEqualTo("123"); + assertThat(sync(bucket.compareAndSet("123", "232"))).isTrue(); + assertThat(sync(bucket.get())).isEqualTo("232"); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("232"); + } + + @Test + public void testTrySet() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.trySet("0"))).isFalse(); + assertThat(sync(bucket.delete())).isTrue(); + assertThat(sync(bucket.trySet("324"))).isTrue(); + assertThat(sync(bucket.trySet("43"))).isFalse(); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + assertThat(sync(b.get())).isEqualTo("324"); + } + + @Test + public void testGetAndRemove() { + RBucketReactive m = redisson.getBucket("test"); + sync(m.set("123")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive set = transaction.getBucket("test"); + assertThat(sync(set.get())).isEqualTo("123"); + assertThat(sync(set.size())).isEqualTo(5); + assertThat(sync(set.getAndDelete())).isEqualTo("123"); + assertThat(sync(set.size())).isEqualTo(0); + assertThat(sync(set.get())).isNull(); + assertThat(sync(set.getAndDelete())).isNull(); + + sync(transaction.commit()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(0); + assertThat(sync(m.get())).isNull(); + } + + @Test + public void testRollback() { + RBucketReactive b = redisson.getBucket("test"); + sync(b.set("1234")); + + RTransactionReactive transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBucketReactive bucket = transaction.getBucket("test"); + assertThat(sync(bucket.get())).isEqualTo("1234"); + assertThat(sync(bucket.getAndDelete())).isEqualTo("1234"); + + assertThat(sync(b.get())).isEqualTo("1234"); + + sync(transaction.rollback()); + + assertThat(sync(redisson.getKeys().count())).isEqualTo(1); + + assertThat(sync(b.get())).isEqualTo("1234"); + } + + +}