From daac20fcd2eee54bfb3974b353866305cc7ab42e Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 May 2018 09:35:36 +0300 Subject: [PATCH] RListReactive, RMapCacheReactive, RSetCacheReactive and RSetReactive are up-to-date to Async interfaces. #1441 --- .../java/org/redisson/RedissonMultimap.java | 4 - .../src/main/java/org/redisson/api/RList.java | 5 + .../java/org/redisson/api/RListReactive.java | 52 ++++++++ .../org/redisson/api/RMapCacheReactive.java | 27 +++++ .../org/redisson/api/RMultimapReactive.java | 2 +- .../org/redisson/api/RSetCacheReactive.java | 8 ++ .../java/org/redisson/api/RSetReactive.java | 25 ++++ .../RedissonBaseMultimapReactive.java | 65 +++++----- .../RedissonListMultimapReactive.java | 113 +++++------------ .../reactive/RedissonListReactive.java | 72 ++++++++++- .../reactive/RedissonMapCacheReactive.java | 30 +++++ .../reactive/RedissonSetCacheReactive.java | 11 ++ .../reactive/RedissonSetMultimapReactive.java | 114 +++++------------- .../reactive/RedissonSetReactive.java | 37 +++++- 14 files changed, 360 insertions(+), 205 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index a01ee66a3..4b2f20224 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; @@ -56,18 +55,15 @@ import io.netty.buffer.ByteBuf; */ public abstract class RedissonMultimap extends RedissonExpirable implements RMultimap { - private final UUID id; final String prefix; RedissonMultimap(CommandAsyncExecutor commandAsyncExecutor, String name) { super(commandAsyncExecutor, name); - this.id = commandAsyncExecutor.getConnectionManager().getId(); prefix = suffixName(getName(), ""); } RedissonMultimap(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String name) { super(codec, commandAsyncExecutor, name); - this.id = commandAsyncExecutor.getConnectionManager().getId(); prefix = suffixName(getName(), ""); } diff --git a/redisson/src/main/java/org/redisson/api/RList.java b/redisson/src/main/java/org/redisson/api/RList.java index 14c457493..1929383a6 100644 --- a/redisson/src/main/java/org/redisson/api/RList.java +++ b/redisson/src/main/java/org/redisson/api/RList.java @@ -92,6 +92,11 @@ public interface RList extends List, RExpirable, RListAsync, RSortable< */ void trim(int fromIndex, int toIndex); + /** + * Remove object by specified index + * + * @param index - index of object + */ void fastRemove(int index); } diff --git a/redisson/src/main/java/org/redisson/api/RListReactive.java b/redisson/src/main/java/org/redisson/api/RListReactive.java index f27118412..c27d3c91b 100644 --- a/redisson/src/main/java/org/redisson/api/RListReactive.java +++ b/redisson/src/main/java/org/redisson/api/RListReactive.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.Collection; +import java.util.List; import org.reactivestreams.Publisher; @@ -29,6 +30,32 @@ import org.reactivestreams.Publisher; // TODO add sublist support public interface RListReactive extends RCollectionReactive { + /** + * Loads elements by specified indexes + * + * @param indexes of elements + * @return elements + */ + Publisher> get(int ...indexes); + + /** + * Add element after elementToFind + * + * @param elementToFind - object to find + * @param element - object to add + * @return new list size + */ + Publisher addAfter(V elementToFind, V element); + + /** + * Add element before elementToFind + * + * @param elementToFind - object to find + * @param element - object to add + * @return new list size + */ + Publisher addBefore(V elementToFind, V element); + Publisher descendingIterator(); Publisher descendingIterator(int startIndex); @@ -50,5 +77,30 @@ public interface RListReactive extends RCollectionReactive { Publisher get(long index); Publisher remove(long index); + + /** + * Read all elements at once + * + * @return list of values + */ + Publisher> readAll(); + /** + * Trim list and remains elements only in specified range + * fromIndex, inclusive, and toIndex, inclusive. + * + * @param fromIndex - from index + * @param toIndex - to index + * @return void + */ + Publisher trim(int fromIndex, int toIndex); + + /** + * Remove object by specified index + * + * @param index - index of object + * @return void + */ + Publisher fastRemove(long index); + } diff --git a/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java b/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java index 43a792779..f41ccee69 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java @@ -40,6 +40,24 @@ import org.reactivestreams.Publisher; */ public interface RMapCacheReactive extends RMapReactive { + /** + * Sets max size of the map. + * Superfluous elements are evicted using LRU algorithm. + * + * @param maxSize - max size + * @return void + */ + Publisher setMaxSize(int maxSize); + + /** + * Tries to set max size of the map. + * Superfluous elements are evicted using LRU algorithm. + * + * @param maxSize - max size + * @return true if max size has been successfully set, otherwise false. + */ + Publisher trySetMaxSize(int maxSize); + /** * If the specified key is not already associated * with a value, associate it with the given value. @@ -205,5 +223,14 @@ public interface RMapCacheReactive extends RMapReactive { */ @Override Publisher size(); + + /** + * Remaining time to live of map entry associated with a key. + * + * @return time in milliseconds + * -2 if the key does not exist. + * -1 if the key exists but has no associated expire. + */ + Publisher remainTimeToLive(K key); } diff --git a/redisson/src/main/java/org/redisson/api/RMultimapReactive.java b/redisson/src/main/java/org/redisson/api/RMultimapReactive.java index 80a3d5489..2c591095c 100644 --- a/redisson/src/main/java/org/redisson/api/RMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMultimapReactive.java @@ -26,7 +26,7 @@ import org.reactivestreams.Publisher; * @param key type * @param value type */ -public interface RMultimapReactive { +public interface RMultimapReactive extends RExpirableReactive { /** * Returns the number of key-value pairs in this multimap. diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java index dc72cb02b..abf621229 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; @@ -39,4 +40,11 @@ public interface RSetCacheReactive extends RCollectionReactive { @Override Publisher size(); + /** + * Read all elements at once + * + * @return values + */ + Publisher> readAll(); + } diff --git a/redisson/src/main/java/org/redisson/api/RSetReactive.java b/redisson/src/main/java/org/redisson/api/RSetReactive.java index f61b84fbd..c23f87283 100644 --- a/redisson/src/main/java/org/redisson/api/RSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetReactive.java @@ -28,6 +28,15 @@ import org.reactivestreams.Publisher; */ public interface RSetReactive extends RCollectionReactive { + /** + * Removes and returns random elements from set + * in async mode + * + * @param amount of random values + * @return random values + */ + Publisher> removeRandom(int amount); + /** * Removes and returns random element from set * in async mode @@ -54,6 +63,13 @@ public interface RSetReactive extends RCollectionReactive { */ Publisher move(String destination, V member); + /** + * Read all elements at once + * + * @return values + */ + Publisher> readAll(); + /** * Union sets specified by name and write to current set. * If current set already exists, it is overwritten. @@ -81,6 +97,15 @@ public interface RSetReactive extends RCollectionReactive { */ Publisher diff(String... names); + /** + * Diff sets specified by name with current set. + * Without current set state change. + * + * @param names - name of sets + * @return values + */ + Publisher> readDiff(String... names); + /** * Intersection sets specified by name and write to current set. * If current set already exists, it is overwritten. diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java index aee483fb9..3b5304e03 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBaseMultimapReactive.java @@ -15,19 +15,16 @@ */ package org.redisson.reactive; -import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.api.RFuture; import org.redisson.api.RMultimap; import org.redisson.api.RMultimapReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; import org.redisson.command.CommandReactiveExecutor; -import org.redisson.misc.Hash; -import io.netty.buffer.ByteBuf; import reactor.fn.Supplier; /** @@ -39,7 +36,7 @@ import reactor.fn.Supplier; */ abstract class RedissonBaseMultimapReactive extends RedissonExpirableReactive implements RMultimapReactive { - private final RMultimap instance; + protected final RMultimap instance; public RedissonBaseMultimapReactive(RMultimap instance, CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name, instance); @@ -150,31 +147,45 @@ abstract class RedissonBaseMultimapReactive extends RedissonExpirableReact } }); } - - protected String hash(ByteBuf objectState) { - return Hash.hash128toBase64(objectState); - } - protected String hashAndRelease(ByteBuf objectState) { - try { - return Hash.hash128toBase64(objectState); - } finally { - objectState.release(); - } + @Override + public Publisher delete() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.deleteAsync(); + } + }); } - String getValuesName(String hash) { - return "{" + getName() + "}:" + hash; + @Override + public Publisher expire(final long timeToLive, final TimeUnit timeUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAsync(timeToLive, timeUnit); + } + }); } - - protected Publisher fastRemove(List mapKeys, List listKeys, RedisCommand evalCommandType) { - return commandExecutor.evalWriteReactive(getName(), codec, evalCommandType, - "local res = redis.call('hdel', KEYS[1], unpack(ARGV)); " + - "if res > 0 then " + - "redis.call('del', unpack(KEYS, 2, #KEYS)); " + - "end; " + - "return res; ", - listKeys, mapKeys.toArray()); + + @Override + public Publisher expireAt(final long timestamp) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.expireAtAsync(timestamp); + } + }); } - + + @Override + public Publisher clearExpire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.clearExpireAsync(); + } + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java index da8cefec9..fc6ca2bb7 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java @@ -15,21 +15,20 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonListMultimap; +import org.redisson.api.RFuture; +import org.redisson.api.RList; +import org.redisson.api.RListMultimap; import org.redisson.api.RListMultimapReactive; import org.redisson.api.RListReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; -import io.netty.buffer.ByteBuf; +import reactor.fn.Supplier; /** * @@ -49,95 +48,39 @@ public class RedissonListMultimapReactive extends RedissonBaseMultimapReac } @Override - public RListReactive get(final K key) { - final ByteBuf keyState = encodeMapKey(key); - final String keyHash = hashAndRelease(keyState); - final String setName = getValuesName(keyHash); - - return new RedissonListReactive(codec, commandExecutor, setName) { - - @Override - public Publisher delete() { - ByteBuf keyState = encodeMapKey(key); - return RedissonListMultimapReactive.this.fastRemove(Arrays.asList(keyState), Arrays.asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT); - } - - @Override - public Publisher clearExpire() { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher expire(long timeToLive, TimeUnit timeUnit) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher expireAt(long timestamp) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher remainTimeToLive() { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher rename(String newName) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher renamenx(String newName) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - }; + public RListReactive get(K key) { + RList list = ((RListMultimap)instance).get(key); + return new RedissonListReactive(codec, commandExecutor, list.getName(), list); } @Override - public Publisher> getAll(K key) { - ByteBuf keyState = encodeMapKey(key); - String keyHash = hashAndRelease(keyState); - String setName = getValuesName(keyHash); - - return commandExecutor.readReactive(getName(), codec, RedisCommands.LRANGE, setName, 0, -1); + public Publisher> getAll(final K key) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RListMultimap)instance).getAllAsync(key); + } + }); } @Override - public Publisher> removeAll(Object key) { - ByteBuf keyState = encodeMapKey(key); - String keyHash = hash(keyState); - - String setName = getValuesName(keyHash); - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST, - "redis.call('hdel', KEYS[1], ARGV[1]); " + - "local members = redis.call('lrange', KEYS[2], 0, -1); " + - "redis.call('del', KEYS[2]); " + - "return members; ", - Arrays.asList(getName(), setName), keyState); + public Publisher> removeAll(final Object key) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RListMultimap)instance).removeAllAsync(key); + } + }); } @Override - public Publisher> replaceValues(K key, Iterable values) { - List params = new ArrayList(); - ByteBuf keyState = encodeMapKey(key); - params.add(keyState); - String keyHash = hash(keyState); - params.add(keyHash); - for (Object value : values) { - ByteBuf valueState = encodeMapValue(value); - params.add(valueState); - } - - String setName = getValuesName(keyHash); - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST, - "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + - "local members = redis.call('lrange', KEYS[2], 0, -1); " + - "redis.call('del', KEYS[2]); " + - "redis.call('rpush', KEYS[2], unpack(ARGV, 3, #ARGV)); " + - "return members; ", - Arrays.asList(getName(), setName), params.toArray()); + public Publisher> replaceValues(final K key, final Iterable values) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RListMultimap)instance).replaceValuesAsync(key, values); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index 20674ad5d..f8c43d953 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -63,10 +63,20 @@ public class RedissonListReactive extends RedissonExpirableReactive implement super(codec, commandExecutor, name, new RedissonList(codec, commandExecutor, name, null)); this.instance = (RListAsync) super.instance; } + + public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RListAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = (RListAsync) super.instance; + } @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.LLEN_INT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sizeAsync(); + } + }); } @Override @@ -143,7 +153,67 @@ public class RedissonListReactive extends RedissonExpirableReactive implement }; } + + @Override + public Publisher trim(final int fromIndex, final int toIndex) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trimAsync(fromIndex, toIndex); + } + }); + } + + @Override + public Publisher fastRemove(final long index) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.fastRemoveAsync(index); + } + }); + } + @Override + public Publisher> readAll() { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readAllAsync(); + } + }); + } + + @Override + public Publisher addBefore(final V elementToFind, final V element) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addBeforeAsync(elementToFind, element); + } + }); + } + + @Override + public Publisher addAfter(final V elementToFind, final V element) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAfterAsync(elementToFind, element); + } + }); + } + + @Override + public Publisher> get(final int ...indexes) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.getAsync(indexes); + } + }); + } + @Override public Publisher add(V e) { return commandExecutor.writeReactive(getName(), codec, RPUSH, getName(), encode(e)); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index a96493683..a896d0ca4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -81,6 +81,36 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im this.mapCache = mapCache; } + @Override + public Publisher setMaxSize(final int maxSize) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return mapCache.setMaxSizeAsync(maxSize); + } + }); + } + + @Override + public Publisher trySetMaxSize(final int maxSize) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return mapCache.trySetMaxSizeAsync(maxSize); + } + }); + } + + @Override + public Publisher remainTimeToLive(final K key) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return mapCache.remainTimeToLiveAsync(key); + } + }); + } + @Override public Publisher containsKey(final Object key) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 8ea1ffb0b..ff4ad7e93 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; @@ -138,6 +139,16 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple Arrays.asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value)); } + @Override + public Publisher> readAll() { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readAllAsync(); + } + }); + } + @Override public Publisher remove(final Object o) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java index 6d83c6b06..89d199574 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java @@ -15,22 +15,20 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonSetMultimap; +import org.redisson.api.RFuture; +import org.redisson.api.RSet; +import org.redisson.api.RSetMultimap; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; -import io.netty.buffer.ByteBuf; +import reactor.fn.Supplier; /** * @@ -50,95 +48,39 @@ public class RedissonSetMultimapReactive extends RedissonBaseMultimapReact } @Override - public RSetReactive get(final K key) { - final ByteBuf keyState = encodeMapKey(key); - final String keyHash = hashAndRelease(keyState); - final String setName = getValuesName(keyHash); - - return new RedissonSetReactive(codec, commandExecutor, setName) { - - @Override - public Publisher delete() { - ByteBuf keyState = encodeMapKey(key); - return RedissonSetMultimapReactive.this.fastRemove(Arrays.asList(keyState), Arrays.asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT); - } - - @Override - public Publisher clearExpire() { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher expire(long timeToLive, TimeUnit timeUnit) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher expireAt(long timestamp) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher remainTimeToLive() { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher rename(String newName) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - @Override - public Publisher renamenx(String newName) { - throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set"); - } - - }; + public RSetReactive get(K key) { + RSet set = ((RSetMultimap)instance).get(key); + return new RedissonSetReactive(codec, commandExecutor, set.getName(), set); } @Override - public Publisher> getAll(K key) { - ByteBuf keyState = encodeMapKey(key); - String keyHash = hashAndRelease(keyState); - String setName = getValuesName(keyHash); - - return commandExecutor.readReactive(getName(), codec, RedisCommands.SMEMBERS, setName); + public Publisher> getAll(final K key) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RSetMultimap)instance).getAllAsync(key); + } + }); } @Override - public Publisher> removeAll(Object key) { - ByteBuf keyState = encodeMapKey(key); - String keyHash = hash(keyState); - - String setName = getValuesName(keyHash); - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET, - "redis.call('hdel', KEYS[1], ARGV[1]); " + - "local members = redis.call('smembers', KEYS[2]); " + - "redis.call('del', KEYS[2]); " + - "return members; ", - Arrays.asList(getName(), setName), keyState); + public Publisher> removeAll(final Object key) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RSetMultimap)instance).removeAllAsync(key); + } + }); } @Override - public Publisher> replaceValues(K key, Iterable values) { - List params = new ArrayList(); - ByteBuf keyState = encodeMapKey(key); - params.add(keyState); - String keyHash = hash(keyState); - params.add(keyHash); - for (Object value : values) { - ByteBuf valueState = encodeMapValue(value); - params.add(valueState); - } - - String setName = getValuesName(keyHash); - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET, - "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + - "local members = redis.call('smembers', KEYS[2]); " + - "redis.call('del', KEYS[2]); " + - "redis.call('sadd', KEYS[2], unpack(ARGV, 3, #ARGV)); " + - "return members; ", - Arrays.asList(getName(), setName), params.toArray()); + public Publisher> replaceValues(final K key, final Iterable values) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return (RFuture>)(Object)((RSetMultimap)instance).replaceValuesAsync(key, values); + } + }); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 12d353761..5f44b6a31 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -71,9 +71,24 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements return new PublisherAdder(this).addAll(c); } + @Override + public Publisher> removeRandom(final int amount) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.removeRandomAsync(amount); + } + }); + } + @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD_INT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sizeAsync(); + } + }); } @Override @@ -85,6 +100,16 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements } }); } + + @Override + public Publisher> readAll() { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readAllAsync(); + } + }); + } private Publisher> scanIteratorReactive(RedisClient client, long startPos) { return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); @@ -199,6 +224,16 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements return commandExecutor.writeReactive(getName(), codec, RedisCommands.SDIFFSTORE, args.toArray()); } + @Override + public Publisher> readDiff(final String... names) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readDiffAsync(names); + } + }); + } + @Override public Publisher union(String... names) { List args = new ArrayList(names.length + 1);