From 1e0a1f16e6072f1635e6d2a94de89db3df92898e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 14 Apr 2023 10:03:34 +0300 Subject: [PATCH] Feature - RSetCache extends RSet interface. #4954 --- .../java/org/redisson/RedissonSetCache.java | 713 +++++++++++++++++- .../main/java/org/redisson/api/RSetCache.java | 131 +--- .../java/org/redisson/api/RSetCacheAsync.java | 19 +- .../client/protocol/RedisCommands.java | 1 + .../org/redisson/RedissonSetCacheTest.java | 112 ++- 5 files changed, 808 insertions(+), 168 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 928d7855e..677c78022 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -16,10 +16,13 @@ package org.redisson; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import org.redisson.api.*; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.IntegerCodec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; @@ -51,8 +54,9 @@ import java.util.stream.Stream; */ public class RedissonSetCache extends RedissonExpirable implements RSetCache, ScanIterator { - RedissonClient redisson; - EvictionScheduler evictionScheduler; + final RedissonClient redisson; + final EvictionScheduler evictionScheduler; + final RedissonScoredSortedSet scoredSortedSet; public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name); @@ -61,6 +65,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } this.evictionScheduler = evictionScheduler; this.redisson = redisson; + this.scoredSortedSet = new RedissonScoredSortedSet(commandExecutor, name, redisson); } public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { @@ -70,6 +75,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } this.evictionScheduler = evictionScheduler; this.redisson = redisson; + this.scoredSortedSet = new RedissonScoredSortedSet(codec, commandExecutor, name, redisson); } @Override @@ -79,12 +85,16 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public int size() { - return get(sizeAsync()); + return scoredSortedSet.size(); } @Override public RFuture sizeAsync() { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.ZCARD_INT, getRawName()); + return commandExecutor.evalReadAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2]);" + + "return #values;", + Arrays.asList(getRawName()), + System.currentTimeMillis(), 92233720368547758L); } @Override @@ -146,7 +156,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< + "end; " + "end;" + "end;" - + "return {res[1], result};", Arrays.asList(name), params.toArray()); + + "return {res[1], result};", Arrays.asList(name), params.toArray()); } @Override @@ -160,7 +170,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } @Override - public Iterator iterator(final String pattern, final int count) { + public Iterator iterator(String pattern, int count) { return new RedissonBaseIterator() { @Override @@ -459,4 +469,695 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return toStream(iterator(pattern)); } + @Override + public int addAllCounted(Collection c) { + return get(addAllCountedAsync(c)); + } + + @Override + public int removeAllCounted(Collection c) { + return get(removeAllCountedAsync(c)); + } + + @Override + public Iterator distributedIterator(String pattern) { + String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, pattern, 10); + } + + @Override + public Iterator distributedIterator(int count) { + String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, null, count); + } + + @Override + public Iterator distributedIterator(String iteratorName, String pattern, int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, pattern, count); + } + + @Override + protected void remove(Object value) { + RedissonSetCache.this.remove(value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, String pattern, int count) { + return get(distributedScanIteratorAsync(iteratorName, pattern, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, String pattern, int count) { + List args = new ArrayList<>(2); + args.add(System.currentTimeMillis()); + if (pattern != null) { + args.add(pattern); + } + args.add(count); + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_ZSCAN, + "local cursor = redis.call('get', KEYS[2]); " + + "if cursor ~= false then " + + "cursor = tonumber(cursor); " + + "else " + + "cursor = 0;" + + "end;" + + "if cursor == -1 then " + + "return {0, {}}; " + + "end;" + + "local result; " + + "if (#ARGV == 3) then " + + "result = redis.call('zscan', KEYS[1], cursor, 'match', ARGV[2], 'count', ARGV[3]); " + + "else " + + "result = redis.call('zscan', KEYS[1], cursor, 'count', ARGV[2]); " + + "end;" + + "local next_cursor = result[1]" + + "if next_cursor ~= \"0\" then " + + "redis.call('setex', KEYS[2], 3600, next_cursor);" + + "else " + + "redis.call('setex', KEYS[2], 3600, -1);" + + "end; " + + "local res = {};" + + "for i, value in ipairs(result[2]) do " + + "if i % 2 == 0 then " + + "local expireDate = value; " + + "if tonumber(expireDate) > tonumber(ARGV[1]) then " + + "table.insert(res, result[2][i-1]); " + + "end; " + + "end; " + + "end;" + + "return {result[1], res};", + Arrays.asList(getRawName(), iteratorName), args.toArray()); + } + + @Override + public Set removeRandom(int amount) { + throw new UnsupportedOperationException(); + } + + @Override + public V removeRandom() { + throw new UnsupportedOperationException(); + } + + @Override + public V random() { + return get(randomAsync()); + } + + @Override + public Set random(int count) { + return get(randomAsync(count)); + } + + @Override + public boolean move(String destination, V member) { + throw new UnsupportedOperationException(); + } + + @Override + public int union(String... names) { + return get(unionAsync(names)); + } + + @Override + public Set readUnion(String... names) { + return get(readUnionAsync(names)); + } + + @Override + public int diff(String... names) { + return get(diffAsync(names)); + } + + @Override + public Set readDiff(String... names) { + return get(readDiffAsync(names)); + } + + @Override + public int intersection(String... names) { + return get(intersectionAsync(names)); + } + + @Override + public Set readIntersection(String... names) { + return get(readIntersectionAsync(names)); + } + + @Override + public Integer countIntersection(String... names) { + return get(countIntersectionAsync(names)); + } + + @Override + public Integer countIntersection(int limit, String... names) { + return get(countIntersectionAsync(limit, names)); + } + + @Override + public List containsEach(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> removeRandomAsync(int amount) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture removeRandomAsync() { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture randomAsync() { + String tempName = prefixName("__redisson_cache_temp", getRawName()); + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_OBJECT, + "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" + + "for i = 1, #values, 2 do " + + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " + + "end;" + + "local res = redis.call('zrandmember', KEYS[2]); " + + "redis.call('del', KEYS[2]); " + + "return res;", + Arrays.asList(getRawName(), tempName), + System.currentTimeMillis(), 92233720368547758L); + } + + @Override + public RFuture> randomAsync(int count) { + String tempName = prefixName("__redisson_cache_temp", getRawName()); + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET, + "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" + + "for i = 1, #values, 2 do " + + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " + + "end;" + + "local res = redis.call('zrandmember', KEYS[2], ARGV[3]); " + + "redis.call('del', KEYS[2]); " + + "return res;", + Arrays.asList(getRawName(), tempName), + System.currentTimeMillis(), 92233720368547758L, count); + } + + @Override + public RFuture moveAsync(String destination, V member) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture unionAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : names) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local args = {KEYS[1], (#KEYS-1)/2};" + + "for i = 2, (#KEYS-1)/2 + 1, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "local k = (#KEYS-1)/2 + i; " + + "table.insert(args, KEYS[k]); " + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[k], values[j+1], values[j]); " + + "end;" + + "end; " + + "table.insert(args, 'AGGREGATE'); " + + "table.insert(args, 'SUM'); " + + "local res = redis.call('zunionstore', unpack(args));" + + "redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " + + "return res;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture> readUnionAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : new ArrayList<>(keys)) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET, + "for i = 1, #KEYS, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " + + "end;" + + "end; " + + "local values = redis.call('zunion', ARGV[3], unpack(KEYS, ARGV[3], #ARGV), 'AGGREGATE', 'SUM');" + + "redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " + + "return values;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture diffAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : names) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local args = {KEYS[1], (#KEYS-1)/2};" + + "for i = 2, (#KEYS-1)/2 + 1, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "local k = (#KEYS-1)/2 + i; " + + "table.insert(args, KEYS[k]); " + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[k], values[j+1], values[j]); " + + "end;" + + "end; " + + "local res = redis.call('zdiffstore', unpack(args));" + + "redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " + + "return res;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture> readDiffAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : new ArrayList<>(keys)) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET, + "for i = 1, #KEYS, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " + + "end;" + + "end; " + + "local values = redis.call('zdiff', ARGV[3], unpack(KEYS, ARGV[3], #ARGV));" + + "redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " + + "return values;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture intersectionAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : names) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local args = {KEYS[1], (#KEYS-1)/2};" + + "for i = 2, (#KEYS-1)/2 + 1, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "local k = (#KEYS-1)/2 + i; " + + "table.insert(args, KEYS[k]); " + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[k], values[j+1], values[j]); " + + "end;" + + "end; " + + "table.insert(args, 'AGGREGATE'); " + + "table.insert(args, 'SUM'); " + + "local res = redis.call('zinterstore', unpack(args));" + + "redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " + + "return res;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture> readIntersectionAsync(String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : new ArrayList<>(keys)) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET, + "for i = 1, #KEYS, 1 do " + + "local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " + + "end;" + + "end; " + + "local values = redis.call('zinter', ARGV[3], unpack(KEYS, ARGV[3], #ARGV), 'AGGREGATE', 'SUM');" + + "redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " + + "return values;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1); + } + + @Override + public RFuture countIntersectionAsync(String... names) { + return countIntersectionAsync(0, names); + } + + @Override + public RFuture countIntersectionAsync(int limit, String... names) { + List keys = new LinkedList<>(); + keys.add(getRawName()); + keys.addAll(Arrays.asList(names)); + for (Object key : new ArrayList<>(keys)) { + String tempName = prefixName("__redisson_cache_temp", key.toString()); + keys.add(tempName); + } + + return commandExecutor.evalWriteAsync(getRawName(), IntegerCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local args = {ARGV[3]};" + + "for i = 1, ARGV[3], 1 do " + + "local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" + + "local k = tonumber(ARGV[3]) + i; " + + "table.insert(args, KEYS[k]); " + + "for j = 1, #values, 2 do " + + "redis.call('zadd', KEYS[k], values[j+1], values[j]); " + + "end;" + + "end; " + + "table.insert(args, 'LIMIT'); " + + "table.insert(args, ARGV[4]); " + + "local res = redis.call('zintercard', unpack(args));" + + "redis.call('del', unpack(KEYS, ARGV[3]+1, #KEYS)); " + + "return res;", + keys, + System.currentTimeMillis(), 92233720368547758L, names.length+1, limit); + } + + @Override + public RFuture addAllCountedAsync(Collection c) { + if (c.isEmpty()) { + return new CompletableFutureWrapper<>(0); + } + + List args = new ArrayList<>(c.size() + 1); + args.add(getRawName()); + for (V v : c) { + args.add(92233720368547758L); + try { + args.add(v); + } catch (Exception e) { + args.forEach(vv -> { + ReferenceCountUtil.safeRelease(vv); + }); + throw e; + } + } + + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, args.toArray()); + } + + @Override + public RFuture removeAllCountedAsync(Collection c) { + if (c.isEmpty()) { + return new CompletableFutureWrapper<>(0); + } + + List args = new ArrayList<>(c.size() + 1); + args.add(getRawName()); + encode(args, c); + + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZREM_INT, args.toArray()); + } + + @Override + public RFuture> containsEachAsync(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public Set readSort(SortOrder order) { + return get(readSortAsync(order)); + } + + @Override + public Set readSort(SortOrder order, int offset, int count) { + return get(readSortAsync(order, offset, count)); + } + + @Override + public Set readSort(String byPattern, SortOrder order) { + return get(readSortAsync(byPattern, order)); + } + + @Override + public Set readSort(String byPattern, SortOrder order, int offset, int count) { + return get(readSortAsync(byPattern, order, offset, count)); + } + + @Override + public Collection readSort(String byPattern, List getPatterns, SortOrder order) { + return get(readSortAsync(byPattern, getPatterns, order)); + } + + @Override + public Collection readSort(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return get(readSortAsync(byPattern, getPatterns, order, offset, count)); + } + + @Override + public Set readSortAlpha(SortOrder order) { + return get(readSortAlphaAsync(order)); + } + + @Override + public Set readSortAlpha(SortOrder order, int offset, int count) { + return get(readSortAlphaAsync(order, offset, count)); + } + + @Override + public Set readSortAlpha(String byPattern, SortOrder order) { + return get(readSortAlphaAsync(byPattern, order)); + } + + @Override + public Set readSortAlpha(String byPattern, SortOrder order, int offset, int count) { + return get(readSortAlphaAsync(byPattern, order, offset, count)); + } + + @Override + public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order) { + return get(readSortAlphaAsync(byPattern, getPatterns, order)); + } + + @Override + public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count)); + } + + @Override + public int sortTo(String destName, SortOrder order) { + return get(sortToAsync(destName, order)); + } + + @Override + public int sortTo(String destName, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, order, offset, count)); + } + + @Override + public int sortTo(String destName, String byPattern, SortOrder order) { + return get(sortToAsync(destName, byPattern, order)); + } + + @Override + public int sortTo(String destName, String byPattern, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, byPattern, order, offset, count)); + } + + @Override + public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order) { + return get(sortToAsync(destName, byPattern, getPatterns, order)); + } + + @Override + public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, byPattern, getPatterns, order, offset, count)); + } + + @Override + public RFuture> readSortAsync(SortOrder order) { + return readSortAsync(null, null, order, -1, -1, false); + } + + @Override + public RFuture> readSortAsync(SortOrder order, int offset, int count) { + return readSortAsync(null, null, order, offset, count, false); + } + + @Override + public RFuture> readSortAsync(String byPattern, SortOrder order) { + return readSortAsync(byPattern, null, order, -1, -1, false); + } + + @Override + public RFuture> readSortAsync(String byPattern, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, null, order, offset, count, false); + } + + @Override + public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order) { + return readSortAsync(byPattern, getPatterns, order, -1, -1); + } + + @Override + public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, getPatterns, order, offset, count, false); + } + + private RFuture readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count, boolean alpha) { + throw new UnsupportedOperationException(); +// List params = new ArrayList<>(); +// params.add(System.currentTimeMillis()); +// params.add(92233720368547758L); +// if (byPattern != null) { +// params.add("BY"); +// params.add(byPattern); +// } +// if (offset != -1 && count != -1) { +// params.add("LIMIT"); +// } +// if (offset != -1) { +// params.add(offset); +// } +// if (count != -1) { +// params.add(count); +// } +// if (getPatterns != null) { +// for (String pattern : getPatterns) { +// params.add("GET"); +// params.add(pattern); +// } +// } +// if (alpha) { +// params.add("ALPHA"); +// } +// if (order != null) { +// params.add(order); +// } +// +// String tempName = prefixName("__redisson_cache_temp", getRawName()); +// return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET, +// "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" + +// "for i = 1, #values, 2 do " +// + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " + +// "end;" + +// "local res = redis.call('sort', KEYS[2], unpack(ARGV, 3, #ARGV)); " + +// "redis.call('del', KEYS[2]); " + +// "return res;", +// Arrays.asList(getRawName(), tempName), params.toArray()); + } + + @Override + public RFuture> readSortAlphaAsync(SortOrder order) { + return readSortAsync(null, null, order, -1, -1, true); + } + + @Override + public RFuture> readSortAlphaAsync(SortOrder order, int offset, int count) { + return readSortAsync(null, null, order, offset, count, true); + } + + @Override + public RFuture> readSortAlphaAsync(String byPattern, SortOrder order) { + return readSortAsync(byPattern, null, order, -1, -1, true); + } + + @Override + public RFuture> readSortAlphaAsync(String byPattern, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, null, order, offset, count, true); + } + + @Override + public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order) { + return readSortAsync(byPattern, getPatterns, order, -1, -1, true); + } + + @Override + public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, getPatterns, order, offset, count, true); + } + + @Override + public RFuture sortToAsync(String destName, SortOrder order) { + return sortToAsync(destName, null, null, order, -1, -1); + } + + @Override + public RFuture sortToAsync(String destName, SortOrder order, int offset, int count) { + return sortToAsync(destName, null, null, order, offset, count); + } + + @Override + public RFuture sortToAsync(String destName, String byPattern, SortOrder order) { + return sortToAsync(destName, byPattern, null, order, -1, -1); + } + + @Override + public RFuture sortToAsync(String destName, String byPattern, SortOrder order, int offset, int count) { + return sortToAsync(destName, byPattern, null, order, offset, count); + } + + @Override + public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order) { + return sortToAsync(destName, byPattern, getPatterns, order, -1, -1); + } + + @Override + public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { + throw new UnsupportedOperationException(); +// List params = new ArrayList<>(); +// params.add(System.currentTimeMillis()); +// params.add(92233720368547758L); +// String tempName = prefixName("__redisson_cache_temp", getRawName()); +// params.add(tempName); +// if (byPattern != null) { +// params.add("BY"); +// params.add(byPattern); +// } +// if (offset != -1 && count != -1) { +// params.add("LIMIT"); +// } +// if (offset != -1) { +// params.add(offset); +// } +// if (count != -1) { +// params.add(count); +// } +// if (getPatterns != null) { +// for (String pattern : getPatterns) { +// params.add("GET"); +// params.add(pattern); +// } +// } +// params.add(order); +// params.add("STORE"); +// params.add(destName); +// +// return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, +// "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" + +// "for i = 1, #values, 2 do " +// + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " + +// "end;" + +// "local res = redis.call('sort', unpack(ARGV, 3, #ARGV)); " + +// "redis.call('del', KEYS[2]); " + +// "return res;", +// Arrays.asList(getRawName(), tempName), params.toArray()); + } } diff --git a/redisson/src/main/java/org/redisson/api/RSetCache.java b/redisson/src/main/java/org/redisson/api/RSetCache.java index 399bc9ba4..50a8bfdd3 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCache.java +++ b/redisson/src/main/java/org/redisson/api/RSetCache.java @@ -39,121 +39,8 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; * * @param value */ -public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDestroyable { +public interface RSetCache extends RSet, RExpirable, RSetCacheAsync, RDestroyable { - /** - * Returns RCountDownLatch instance associated with value - * - * @param value - set value - * @return RCountDownLatch object - */ - RCountDownLatch getCountDownLatch(V value); - - /** - * Returns RPermitExpirableSemaphore instance associated with value - * - * @param value - set value - * @return RPermitExpirableSemaphore object - */ - RPermitExpirableSemaphore getPermitExpirableSemaphore(V value); - - /** - * Returns RSemaphore instance associated with value - * - * @param value - set value - * @return RSemaphore object - */ - RSemaphore getSemaphore(V value); - - /** - * Returns RLock instance associated with value - * - * @param value - set value - * @return RLock object - */ - RLock getFairLock(V value); - - /** - * Returns RReadWriteLock instance associated with value - * - * @param value - set value - * @return RReadWriteLock object - */ - RReadWriteLock getReadWriteLock(V value); - - /** - * Returns lock instance associated with value - * - * @param value - set value - * @return RLock object - */ - RLock getLock(V value); - - /** - * Returns stream of elements in this set. - * Elements are loaded in batch. Batch size is defined by count param. - * - * @param count - size of elements batch - * @return stream of elements - */ - Stream stream(int count); - - /** - * Returns stream of elements in this set. - * Elements are loaded in batch. Batch size is defined by count param. - * If pattern is not null then only elements match this pattern are loaded. - * - * @param pattern - search pattern - * @param count - size of elements batch - * @return stream of elements - */ - Stream stream(String pattern, int count); - - /** - * Returns stream of elements in this set matches pattern. - * - * @param pattern - search pattern - * @return stream of elements - */ - Stream stream(String pattern); - - /** - * Returns an iterator over elements in this set. - * Elements are loaded in batch. Batch size is defined by count param. - * - * @param count - size of elements batch - * @return iterator - */ - Iterator iterator(int count); - - /** - * Returns an iterator over elements in this set. - * Elements are loaded in batch. Batch size is defined by count param. - * If pattern is not null then only elements match this pattern are loaded. - * - * @param pattern - search pattern - * @param count - size of elements batch - * @return iterator - */ - Iterator iterator(String pattern, int count); - - /** - * Returns values iterator matches pattern. - * - * @param pattern - search pattern - * @return iterator - */ - Iterator iterator(String pattern); - - /** - * Returns RMapReduce object associated with this map - * - * @param output key - * @param output value - * @return MapReduce instance - */ - RCollectionMapReduce mapReduce(); - /** * Stores value with specified time to live. * Value expires after specified time to live. @@ -177,22 +64,6 @@ public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDe @Override int size(); - /** - * Read all elements at once - * - * @return values - */ - Set readAll(); - - /** - * Tries to add elements only if none of them in set. - * - * @param values - values to add - * @return true if elements successfully added, - * otherwise false. - */ - boolean tryAdd(V... values); - /** * Tries to add elements only if none of them in set. * diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheAsync.java b/redisson/src/main/java/org/redisson/api/RSetCacheAsync.java index 83644339c..1bdfe89f7 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheAsync.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheAsync.java @@ -15,7 +15,6 @@ */ package org.redisson.api; -import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -25,7 +24,7 @@ import java.util.concurrent.TimeUnit; * * @param value */ -public interface RSetCacheAsync extends RCollectionAsync { +public interface RSetCacheAsync extends RSetAsync { /** * Stores value with specified time to live. @@ -50,22 +49,6 @@ public interface RSetCacheAsync extends RCollectionAsync { @Override RFuture sizeAsync(); - /** - * Read all elements at once - * - * @return values - */ - RFuture> readAllAsync(); - - /** - * Tries to add elements only if none of them in set. - * - * @param values - values to add - * @return true if elements successfully added, - * otherwise false. - */ - RFuture tryAddAsync(V... values); - /** * Tries to add elements only if none of them in set. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index e05fbe5c2..1892c35c6 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -94,6 +94,7 @@ public interface RedisCommands { RedisCommand ZADD_RAW = new RedisCommand("ZADD"); RedisStrictCommand ZADD_INT = new RedisStrictCommand("ZADD", new IntegerReplayConvertor()); RedisCommand ZADD = new RedisCommand("ZADD"); + RedisStrictCommand ZREM_INT = new RedisStrictCommand<>("ZREM", new IntegerReplayConvertor()); RedisStrictCommand ZREM_LONG = new RedisStrictCommand("ZREM"); RedisCommand ZREM = new RedisCommand("ZREM", new BooleanAmountReplayConvertor()); RedisStrictCommand ZCARD_INT = new RedisStrictCommand("ZCARD", new IntegerReplayConvertor()); diff --git a/redisson/src/test/java/org/redisson/RedissonSetCacheTest.java b/redisson/src/test/java/org/redisson/RedissonSetCacheTest.java index 3aef8cc50..a141fbf6e 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetCacheTest.java @@ -1,19 +1,5 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.Serializable; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - import org.joor.Reflect; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -21,6 +7,14 @@ import org.redisson.api.RSetCache; import org.redisson.client.codec.IntegerCodec; import org.redisson.eviction.EvictionScheduler; +import java.io.Serializable; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonSetCacheTest extends BaseTest { public static class SimpleBean implements Serializable { @@ -513,7 +507,97 @@ public class RedissonSetCacheTest extends BaseTest { Assertions.assertEquals(0, cache.size()); cache.destroy(); + } + + @Test + public void testUnion() throws InterruptedException { + redisson.getKeys().flushall(); + RSetCache cache1 = redisson.getSetCache("cache1", IntegerCodec.INSTANCE); + cache1.add(1); + cache1.add(2, 1, TimeUnit.SECONDS); + cache1.add(5, 1, TimeUnit.SECONDS); + cache1.add(3); + + RSetCache cache2 = redisson.getSetCache("cache2", IntegerCodec.INSTANCE); + cache2.add(4); + cache2.add(2, 1, TimeUnit.SECONDS); + cache2.add(5, 1, TimeUnit.SECONDS); + cache2.add(7); + + + RSetCache cache3 = redisson.getSetCache("cache3", IntegerCodec.INSTANCE); + assertThat(cache3.union("cache1", "cache2")).isEqualTo(6); + assertThat(cache3).containsExactlyInAnyOrder(1, 3, 2, 5, 4, 7); + cache3.clear(); + + Thread.sleep(1500); + assertThat(cache3.union("cache1", "cache2")).isEqualTo(4); + assertThat(cache3).containsExactlyInAnyOrder(1, 3, 4, 7); + + assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2", "cache3"); + } + + @Test + public void testDiff() throws InterruptedException { + redisson.getKeys().flushall(); + RSetCache cache1 = redisson.getSetCache("cache1", IntegerCodec.INSTANCE); + cache1.add(1); + cache1.add(2, 1, TimeUnit.SECONDS); + cache1.add(5, 1, TimeUnit.SECONDS); + cache1.add(3, 1, TimeUnit.SECONDS); + + RSetCache cache2 = redisson.getSetCache("cache2", IntegerCodec.INSTANCE); + cache2.add(4); + cache2.add(2, 1, TimeUnit.SECONDS); + cache2.add(5, 1, TimeUnit.SECONDS); + cache2.add(7); + + + RSetCache cache3 = redisson.getSetCache("cache3", IntegerCodec.INSTANCE); + assertThat(cache3.diff("cache1", "cache2")).isEqualTo(2); + assertThat(cache3).containsExactlyInAnyOrder(1, 3); + cache3.clear(); + + Thread.sleep(1500); + + assertThat(cache3.diff("cache1", "cache2")).isEqualTo(1); + assertThat(cache3).containsExactlyInAnyOrder(1); + + assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2", "cache3"); + } + + @Test + public void testIntersection() throws InterruptedException { + redisson.getKeys().flushall(); + RSetCache cache1 = redisson.getSetCache("cache1"); + cache1.add(1); + cache1.add(2, 1, TimeUnit.SECONDS); + cache1.add(5, 1, TimeUnit.SECONDS); + cache1.add(3); + + RSetCache cache2 = redisson.getSetCache("cache2"); + cache2.add(4); + cache2.add(2, 1, TimeUnit.SECONDS); + cache2.add(5, 1, TimeUnit.SECONDS); + cache2.add(7); + + + assertThat(cache1.countIntersection("cache2")).isEqualTo(2); + RSetCache cache3 = redisson.getSetCache("cache3"); + assertThat(cache3.intersection("cache1", "cache2")).isEqualTo(2); + assertThat(cache3).containsExactlyInAnyOrder(2, 5); + cache3.clear(); + + Thread.sleep(1500); + + assertThat(cache1.countIntersection("cache2")).isEqualTo(0); + assertThat(cache3.intersection("cache1", "cache2")).isEqualTo(0); + assertThat(cache3).isEmpty(); + + assertThat(redisson.getKeys().count()).isEqualTo(2); + assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2"); } + }