Feature - RSetCache extends RSet interface. #4954

pull/4966/head
Nikita Koksharov 2 years ago
parent 8fa9b472b1
commit 1e0a1f16e6

@ -16,10 +16,13 @@
package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -51,8 +54,9 @@ import java.util.stream.Stream;
*/
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V>, ScanIterator {
RedissonClient redisson;
EvictionScheduler evictionScheduler;
final RedissonClient redisson;
final EvictionScheduler evictionScheduler;
final RedissonScoredSortedSet<V> scoredSortedSet;
public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
@ -61,6 +65,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
this.evictionScheduler = evictionScheduler;
this.redisson = redisson;
this.scoredSortedSet = new RedissonScoredSortedSet<V>(commandExecutor, name, redisson);
}
public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
@ -70,6 +75,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
this.evictionScheduler = evictionScheduler;
this.redisson = redisson;
this.scoredSortedSet = new RedissonScoredSortedSet<V>(codec, commandExecutor, name, redisson);
}
@Override
@ -79,12 +85,16 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public int size() {
return get(sizeAsync());
return scoredSortedSet.size();
}
@Override
public RFuture<Integer> sizeAsync() {
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.ZCARD_INT, getRawName());
return commandExecutor.evalReadAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2]);" +
"return #values;",
Arrays.asList(getRawName()),
System.currentTimeMillis(), 92233720368547758L);
}
@Override
@ -146,7 +156,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
+ "end; "
+ "end;"
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(name), params.toArray());
+ "return {res[1], result};", Arrays.asList(name), params.toArray());
}
@Override
@ -160,7 +170,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
public Iterator<V> iterator(String pattern, int count) {
return new RedissonBaseIterator<V>() {
@Override
@ -459,4 +469,695 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return toStream(iterator(pattern));
}
@Override
public int addAllCounted(Collection<? extends V> c) {
return get(addAllCountedAsync(c));
}
@Override
public int removeAllCounted(Collection<? extends V> c) {
return get(removeAllCountedAsync(c));
}
@Override
public Iterator<V> distributedIterator(String pattern) {
String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, pattern, 10);
}
@Override
public Iterator<V> distributedIterator(int count) {
String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, null, count);
}
@Override
public Iterator<V> distributedIterator(String iteratorName, String pattern, int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonSetCache.this.remove(value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, String pattern, int count) {
return get(distributedScanIteratorAsync(iteratorName, pattern, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, String pattern, int count) {
List<Object> args = new ArrayList<>(2);
args.add(System.currentTimeMillis());
if (pattern != null) {
args.add(pattern);
}
args.add(count);
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_ZSCAN,
"local cursor = redis.call('get', KEYS[2]); "
+ "if cursor ~= false then "
+ "cursor = tonumber(cursor); "
+ "else "
+ "cursor = 0;"
+ "end;"
+ "if cursor == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local result; "
+ "if (#ARGV == 3) then "
+ "result = redis.call('zscan', KEYS[1], cursor, 'match', ARGV[2], 'count', ARGV[3]); "
+ "else "
+ "result = redis.call('zscan', KEYS[1], cursor, 'count', ARGV[2]); "
+ "end;"
+ "local next_cursor = result[1]"
+ "if next_cursor ~= \"0\" then "
+ "redis.call('setex', KEYS[2], 3600, next_cursor);"
+ "else "
+ "redis.call('setex', KEYS[2], 3600, -1);"
+ "end; "
+ "local res = {};"
+ "for i, value in ipairs(result[2]) do "
+ "if i % 2 == 0 then "
+ "local expireDate = value; "
+ "if tonumber(expireDate) > tonumber(ARGV[1]) then "
+ "table.insert(res, result[2][i-1]); "
+ "end; "
+ "end; "
+ "end;"
+ "return {result[1], res};",
Arrays.asList(getRawName(), iteratorName), args.toArray());
}
@Override
public Set<V> removeRandom(int amount) {
throw new UnsupportedOperationException();
}
@Override
public V removeRandom() {
throw new UnsupportedOperationException();
}
@Override
public V random() {
return get(randomAsync());
}
@Override
public Set<V> random(int count) {
return get(randomAsync(count));
}
@Override
public boolean move(String destination, V member) {
throw new UnsupportedOperationException();
}
@Override
public int union(String... names) {
return get(unionAsync(names));
}
@Override
public Set<V> readUnion(String... names) {
return get(readUnionAsync(names));
}
@Override
public int diff(String... names) {
return get(diffAsync(names));
}
@Override
public Set<V> readDiff(String... names) {
return get(readDiffAsync(names));
}
@Override
public int intersection(String... names) {
return get(intersectionAsync(names));
}
@Override
public Set<V> readIntersection(String... names) {
return get(readIntersectionAsync(names));
}
@Override
public Integer countIntersection(String... names) {
return get(countIntersectionAsync(names));
}
@Override
public Integer countIntersection(int limit, String... names) {
return get(countIntersectionAsync(limit, names));
}
@Override
public List<V> containsEach(Collection<V> c) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Set<V>> removeRandomAsync(int amount) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> removeRandomAsync() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> randomAsync() {
String tempName = prefixName("__redisson_cache_temp", getRawName());
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_OBJECT,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" +
"for i = 1, #values, 2 do "
+ "redis.call('zadd', KEYS[2], values[i], values[i+1]); " +
"end;" +
"local res = redis.call('zrandmember', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"return res;",
Arrays.asList(getRawName(), tempName),
System.currentTimeMillis(), 92233720368547758L);
}
@Override
public RFuture<Set<V>> randomAsync(int count) {
String tempName = prefixName("__redisson_cache_temp", getRawName());
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" +
"for i = 1, #values, 2 do "
+ "redis.call('zadd', KEYS[2], values[i], values[i+1]); " +
"end;" +
"local res = redis.call('zrandmember', KEYS[2], ARGV[3]); " +
"redis.call('del', KEYS[2]); " +
"return res;",
Arrays.asList(getRawName(), tempName),
System.currentTimeMillis(), 92233720368547758L, count);
}
@Override
public RFuture<Boolean> moveAsync(String destination, V member) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Integer> unionAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : names) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local args = {KEYS[1], (#KEYS-1)/2};" +
"for i = 2, (#KEYS-1)/2 + 1, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"local k = (#KEYS-1)/2 + i; " +
"table.insert(args, KEYS[k]); " +
"for j = 1, #values, 2 do " +
"redis.call('zadd', KEYS[k], values[j+1], values[j]); " +
"end;" +
"end; " +
"table.insert(args, 'AGGREGATE'); " +
"table.insert(args, 'SUM'); " +
"local res = redis.call('zunionstore', unpack(args));" +
"redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " +
"return res;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Set<V>> readUnionAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : new ArrayList<>(keys)) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET,
"for i = 1, #KEYS, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"for j = 1, #values, 2 do "
+ "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " +
"end;" +
"end; " +
"local values = redis.call('zunion', ARGV[3], unpack(KEYS, ARGV[3], #ARGV), 'AGGREGATE', 'SUM');" +
"redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " +
"return values;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Integer> diffAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : names) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local args = {KEYS[1], (#KEYS-1)/2};" +
"for i = 2, (#KEYS-1)/2 + 1, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"local k = (#KEYS-1)/2 + i; " +
"table.insert(args, KEYS[k]); " +
"for j = 1, #values, 2 do " +
"redis.call('zadd', KEYS[k], values[j+1], values[j]); " +
"end;" +
"end; " +
"local res = redis.call('zdiffstore', unpack(args));" +
"redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " +
"return res;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Set<V>> readDiffAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : new ArrayList<>(keys)) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET,
"for i = 1, #KEYS, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"for j = 1, #values, 2 do "
+ "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " +
"end;" +
"end; " +
"local values = redis.call('zdiff', ARGV[3], unpack(KEYS, ARGV[3], #ARGV));" +
"redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " +
"return values;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Integer> intersectionAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : names) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local args = {KEYS[1], (#KEYS-1)/2};" +
"for i = 2, (#KEYS-1)/2 + 1, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"local k = (#KEYS-1)/2 + i; " +
"table.insert(args, KEYS[k]); " +
"for j = 1, #values, 2 do " +
"redis.call('zadd', KEYS[k], values[j+1], values[j]); " +
"end;" +
"end; " +
"table.insert(args, 'AGGREGATE'); " +
"table.insert(args, 'SUM'); " +
"local res = redis.call('zinterstore', unpack(args));" +
"redis.call('del', unpack(KEYS, (#KEYS-1)/2+2, #KEYS)); " +
"return res;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Set<V>> readIntersectionAsync(String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : new ArrayList<>(keys)) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET,
"for i = 1, #KEYS, 1 do " +
"local values = redis.call('zrangebyscore', KEYS[ARGV[3] + i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"for j = 1, #values, 2 do "
+ "redis.call('zadd', KEYS[ARGV[3] + i], values[j], values[j+1]); " +
"end;" +
"end; " +
"local values = redis.call('zinter', ARGV[3], unpack(KEYS, ARGV[3], #ARGV), 'AGGREGATE', 'SUM');" +
"redis.call('del', unpack(KEYS, ARGV[3], #KEYS)); " +
"return values;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1);
}
@Override
public RFuture<Integer> countIntersectionAsync(String... names) {
return countIntersectionAsync(0, names);
}
@Override
public RFuture<Integer> countIntersectionAsync(int limit, String... names) {
List<Object> keys = new LinkedList<>();
keys.add(getRawName());
keys.addAll(Arrays.asList(names));
for (Object key : new ArrayList<>(keys)) {
String tempName = prefixName("__redisson_cache_temp", key.toString());
keys.add(tempName);
}
return commandExecutor.evalWriteAsync(getRawName(), IntegerCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local args = {ARGV[3]};" +
"for i = 1, ARGV[3], 1 do " +
"local values = redis.call('zrangebyscore', KEYS[i], ARGV[1], ARGV[2], 'WITHSCORES');" +
"local k = tonumber(ARGV[3]) + i; " +
"table.insert(args, KEYS[k]); " +
"for j = 1, #values, 2 do " +
"redis.call('zadd', KEYS[k], values[j+1], values[j]); " +
"end;" +
"end; " +
"table.insert(args, 'LIMIT'); " +
"table.insert(args, ARGV[4]); " +
"local res = redis.call('zintercard', unpack(args));" +
"redis.call('del', unpack(KEYS, ARGV[3]+1, #KEYS)); " +
"return res;",
keys,
System.currentTimeMillis(), 92233720368547758L, names.length+1, limit);
}
@Override
public RFuture<Integer> addAllCountedAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return new CompletableFutureWrapper<>(0);
}
List<Object> args = new ArrayList<>(c.size() + 1);
args.add(getRawName());
for (V v : c) {
args.add(92233720368547758L);
try {
args.add(v);
} catch (Exception e) {
args.forEach(vv -> {
ReferenceCountUtil.safeRelease(vv);
});
throw e;
}
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, args.toArray());
}
@Override
public RFuture<Integer> removeAllCountedAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return new CompletableFutureWrapper<>(0);
}
List<Object> args = new ArrayList<>(c.size() + 1);
args.add(getRawName());
encode(args, c);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZREM_INT, args.toArray());
}
@Override
public RFuture<List<V>> containsEachAsync(Collection<V> c) {
throw new UnsupportedOperationException();
}
@Override
public Set<V> readSort(SortOrder order) {
return get(readSortAsync(order));
}
@Override
public Set<V> readSort(SortOrder order, int offset, int count) {
return get(readSortAsync(order, offset, count));
}
@Override
public Set<V> readSort(String byPattern, SortOrder order) {
return get(readSortAsync(byPattern, order));
}
@Override
public Set<V> readSort(String byPattern, SortOrder order, int offset, int count) {
return get(readSortAsync(byPattern, order, offset, count));
}
@Override
public <T> Collection<T> readSort(String byPattern, List<String> getPatterns, SortOrder order) {
return get(readSortAsync(byPattern, getPatterns, order));
}
@Override
public <T> Collection<T> readSort(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return get(readSortAsync(byPattern, getPatterns, order, offset, count));
}
@Override
public Set<V> readSortAlpha(SortOrder order) {
return get(readSortAlphaAsync(order));
}
@Override
public Set<V> readSortAlpha(SortOrder order, int offset, int count) {
return get(readSortAlphaAsync(order, offset, count));
}
@Override
public Set<V> readSortAlpha(String byPattern, SortOrder order) {
return get(readSortAlphaAsync(byPattern, order));
}
@Override
public Set<V> readSortAlpha(String byPattern, SortOrder order, int offset, int count) {
return get(readSortAlphaAsync(byPattern, order, offset, count));
}
@Override
public <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order) {
return get(readSortAlphaAsync(byPattern, getPatterns, order));
}
@Override
public <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count));
}
@Override
public int sortTo(String destName, SortOrder order) {
return get(sortToAsync(destName, order));
}
@Override
public int sortTo(String destName, SortOrder order, int offset, int count) {
return get(sortToAsync(destName, order, offset, count));
}
@Override
public int sortTo(String destName, String byPattern, SortOrder order) {
return get(sortToAsync(destName, byPattern, order));
}
@Override
public int sortTo(String destName, String byPattern, SortOrder order, int offset, int count) {
return get(sortToAsync(destName, byPattern, order, offset, count));
}
@Override
public int sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order) {
return get(sortToAsync(destName, byPattern, getPatterns, order));
}
@Override
public int sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return get(sortToAsync(destName, byPattern, getPatterns, order, offset, count));
}
@Override
public RFuture<Set<V>> readSortAsync(SortOrder order) {
return readSortAsync(null, null, order, -1, -1, false);
}
@Override
public RFuture<Set<V>> readSortAsync(SortOrder order, int offset, int count) {
return readSortAsync(null, null, order, offset, count, false);
}
@Override
public RFuture<Set<V>> readSortAsync(String byPattern, SortOrder order) {
return readSortAsync(byPattern, null, order, -1, -1, false);
}
@Override
public RFuture<Set<V>> readSortAsync(String byPattern, SortOrder order, int offset, int count) {
return readSortAsync(byPattern, null, order, offset, count, false);
}
@Override
public <T> RFuture<Collection<T>> readSortAsync(String byPattern, List<String> getPatterns, SortOrder order) {
return readSortAsync(byPattern, getPatterns, order, -1, -1);
}
@Override
public <T> RFuture<Collection<T>> readSortAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return readSortAsync(byPattern, getPatterns, order, offset, count, false);
}
private <T> RFuture<T> readSortAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count, boolean alpha) {
throw new UnsupportedOperationException();
// List<Object> params = new ArrayList<>();
// params.add(System.currentTimeMillis());
// params.add(92233720368547758L);
// if (byPattern != null) {
// params.add("BY");
// params.add(byPattern);
// }
// if (offset != -1 && count != -1) {
// params.add("LIMIT");
// }
// if (offset != -1) {
// params.add(offset);
// }
// if (count != -1) {
// params.add(count);
// }
// if (getPatterns != null) {
// for (String pattern : getPatterns) {
// params.add("GET");
// params.add(pattern);
// }
// }
// if (alpha) {
// params.add("ALPHA");
// }
// if (order != null) {
// params.add(order);
// }
//
// String tempName = prefixName("__redisson_cache_temp", getRawName());
// return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SET,
// "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" +
// "for i = 1, #values, 2 do "
// + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " +
// "end;" +
// "local res = redis.call('sort', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
// "redis.call('del', KEYS[2]); " +
// "return res;",
// Arrays.asList(getRawName(), tempName), params.toArray());
}
@Override
public RFuture<Set<V>> readSortAlphaAsync(SortOrder order) {
return readSortAsync(null, null, order, -1, -1, true);
}
@Override
public RFuture<Set<V>> readSortAlphaAsync(SortOrder order, int offset, int count) {
return readSortAsync(null, null, order, offset, count, true);
}
@Override
public RFuture<Set<V>> readSortAlphaAsync(String byPattern, SortOrder order) {
return readSortAsync(byPattern, null, order, -1, -1, true);
}
@Override
public RFuture<Set<V>> readSortAlphaAsync(String byPattern, SortOrder order, int offset, int count) {
return readSortAsync(byPattern, null, order, offset, count, true);
}
@Override
public <T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order) {
return readSortAsync(byPattern, getPatterns, order, -1, -1, true);
}
@Override
public <T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return readSortAsync(byPattern, getPatterns, order, offset, count, true);
}
@Override
public RFuture<Integer> sortToAsync(String destName, SortOrder order) {
return sortToAsync(destName, null, null, order, -1, -1);
}
@Override
public RFuture<Integer> sortToAsync(String destName, SortOrder order, int offset, int count) {
return sortToAsync(destName, null, null, order, offset, count);
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, SortOrder order) {
return sortToAsync(destName, byPattern, null, order, -1, -1);
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, SortOrder order, int offset, int count) {
return sortToAsync(destName, byPattern, null, order, offset, count);
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, List<String> getPatterns, SortOrder order) {
return sortToAsync(destName, byPattern, getPatterns, order, -1, -1);
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
throw new UnsupportedOperationException();
// List<Object> params = new ArrayList<>();
// params.add(System.currentTimeMillis());
// params.add(92233720368547758L);
// String tempName = prefixName("__redisson_cache_temp", getRawName());
// params.add(tempName);
// if (byPattern != null) {
// params.add("BY");
// params.add(byPattern);
// }
// if (offset != -1 && count != -1) {
// params.add("LIMIT");
// }
// if (offset != -1) {
// params.add(offset);
// }
// if (count != -1) {
// params.add(count);
// }
// if (getPatterns != null) {
// for (String pattern : getPatterns) {
// params.add("GET");
// params.add(pattern);
// }
// }
// params.add(order);
// params.add("STORE");
// params.add(destName);
//
// return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
// "local values = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2], 'WITHSCORES');" +
// "for i = 1, #values, 2 do "
// + "redis.call('zadd', KEYS[2], values[i], values[i+1]); " +
// "end;" +
// "local res = redis.call('sort', unpack(ARGV, 3, #ARGV)); " +
// "redis.call('del', KEYS[2]); " +
// "return res;",
// Arrays.asList(getRawName(), tempName), params.toArray());
}
}

@ -39,121 +39,8 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
*
* @param <V> value
*/
public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDestroyable {
public interface RSetCache<V> extends RSet<V>, RExpirable, RSetCacheAsync<V>, RDestroyable {
/**
* Returns <code>RCountDownLatch</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RCountDownLatch object
*/
RCountDownLatch getCountDownLatch(V value);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphore getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLock getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getLock(V value);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(int count);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(String pattern, int count);
/**
* Returns stream of elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return stream of elements
*/
Stream<V> stream(String pattern);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(String pattern, int count);
/**
* Returns values iterator matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return iterator
*/
Iterator<V> iterator(String pattern);
/**
* Returns <code>RMapReduce</code> object associated with this map
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Stores value with specified time to live.
* Value expires after specified time to live.
@ -177,22 +64,6 @@ public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDe
@Override
int size();
/**
* Read all elements at once
*
* @return values
*/
Set<V> readAll();
/**
* Tries to add elements only if none of them in set.
*
* @param values - values to add
* @return <code>true</code> if elements successfully added,
* otherwise <code>false</code>.
*/
boolean tryAdd(V... values);
/**
* Tries to add elements only if none of them in set.
*

@ -15,7 +15,6 @@
*/
package org.redisson.api;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -25,7 +24,7 @@ import java.util.concurrent.TimeUnit;
*
* @param <V> value
*/
public interface RSetCacheAsync<V> extends RCollectionAsync<V> {
public interface RSetCacheAsync<V> extends RSetAsync<V> {
/**
* Stores value with specified time to live.
@ -50,22 +49,6 @@ public interface RSetCacheAsync<V> extends RCollectionAsync<V> {
@Override
RFuture<Integer> sizeAsync();
/**
* Read all elements at once
*
* @return values
*/
RFuture<Set<V>> readAllAsync();
/**
* Tries to add elements only if none of them in set.
*
* @param values - values to add
* @return <code>true</code> if elements successfully added,
* otherwise <code>false</code>.
*/
RFuture<Boolean> tryAddAsync(V... values);
/**
* Tries to add elements only if none of them in set.
*

@ -94,6 +94,7 @@ public interface RedisCommands {
RedisCommand<Boolean> ZADD_RAW = new RedisCommand<Boolean>("ZADD");
RedisStrictCommand<Integer> ZADD_INT = new RedisStrictCommand<Integer>("ZADD", new IntegerReplayConvertor());
RedisCommand<Long> ZADD = new RedisCommand<Long>("ZADD");
RedisStrictCommand<Integer> ZREM_INT = new RedisStrictCommand<>("ZREM", new IntegerReplayConvertor());
RedisStrictCommand<Long> ZREM_LONG = new RedisStrictCommand<Long>("ZREM");
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor());
RedisStrictCommand<Integer> ZCARD_INT = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());

@ -1,19 +1,5 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.joor.Reflect;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -21,6 +7,14 @@ import org.redisson.api.RSetCache;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.eviction.EvictionScheduler;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSetCacheTest extends BaseTest {
public static class SimpleBean implements Serializable {
@ -513,7 +507,97 @@ public class RedissonSetCacheTest extends BaseTest {
Assertions.assertEquals(0, cache.size());
cache.destroy();
}
@Test
public void testUnion() throws InterruptedException {
redisson.getKeys().flushall();
RSetCache<Integer> cache1 = redisson.getSetCache("cache1", IntegerCodec.INSTANCE);
cache1.add(1);
cache1.add(2, 1, TimeUnit.SECONDS);
cache1.add(5, 1, TimeUnit.SECONDS);
cache1.add(3);
RSetCache<Integer> cache2 = redisson.getSetCache("cache2", IntegerCodec.INSTANCE);
cache2.add(4);
cache2.add(2, 1, TimeUnit.SECONDS);
cache2.add(5, 1, TimeUnit.SECONDS);
cache2.add(7);
RSetCache<Integer> cache3 = redisson.getSetCache("cache3", IntegerCodec.INSTANCE);
assertThat(cache3.union("cache1", "cache2")).isEqualTo(6);
assertThat(cache3).containsExactlyInAnyOrder(1, 3, 2, 5, 4, 7);
cache3.clear();
Thread.sleep(1500);
assertThat(cache3.union("cache1", "cache2")).isEqualTo(4);
assertThat(cache3).containsExactlyInAnyOrder(1, 3, 4, 7);
assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2", "cache3");
}
@Test
public void testDiff() throws InterruptedException {
redisson.getKeys().flushall();
RSetCache<Integer> cache1 = redisson.getSetCache("cache1", IntegerCodec.INSTANCE);
cache1.add(1);
cache1.add(2, 1, TimeUnit.SECONDS);
cache1.add(5, 1, TimeUnit.SECONDS);
cache1.add(3, 1, TimeUnit.SECONDS);
RSetCache<Integer> cache2 = redisson.getSetCache("cache2", IntegerCodec.INSTANCE);
cache2.add(4);
cache2.add(2, 1, TimeUnit.SECONDS);
cache2.add(5, 1, TimeUnit.SECONDS);
cache2.add(7);
RSetCache<Integer> cache3 = redisson.getSetCache("cache3", IntegerCodec.INSTANCE);
assertThat(cache3.diff("cache1", "cache2")).isEqualTo(2);
assertThat(cache3).containsExactlyInAnyOrder(1, 3);
cache3.clear();
Thread.sleep(1500);
assertThat(cache3.diff("cache1", "cache2")).isEqualTo(1);
assertThat(cache3).containsExactlyInAnyOrder(1);
assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2", "cache3");
}
@Test
public void testIntersection() throws InterruptedException {
redisson.getKeys().flushall();
RSetCache<Integer> cache1 = redisson.getSetCache("cache1");
cache1.add(1);
cache1.add(2, 1, TimeUnit.SECONDS);
cache1.add(5, 1, TimeUnit.SECONDS);
cache1.add(3);
RSetCache<Integer> cache2 = redisson.getSetCache("cache2");
cache2.add(4);
cache2.add(2, 1, TimeUnit.SECONDS);
cache2.add(5, 1, TimeUnit.SECONDS);
cache2.add(7);
assertThat(cache1.countIntersection("cache2")).isEqualTo(2);
RSetCache<Integer> cache3 = redisson.getSetCache("cache3");
assertThat(cache3.intersection("cache1", "cache2")).isEqualTo(2);
assertThat(cache3).containsExactlyInAnyOrder(2, 5);
cache3.clear();
Thread.sleep(1500);
assertThat(cache1.countIntersection("cache2")).isEqualTo(0);
assertThat(cache3.intersection("cache1", "cache2")).isEqualTo(0);
assertThat(cache3).isEmpty();
assertThat(redisson.getKeys().count()).isEqualTo(2);
assertThat(redisson.getKeys().getKeys()).containsExactlyInAnyOrder("cache1", "cache2");
}
}

Loading…
Cancel
Save