diff --git a/redisson/src/main/java/org/redisson/BaseRedissonList.java b/redisson/src/main/java/org/redisson/BaseRedissonList.java new file mode 100644 index 000000000..c2dda802a --- /dev/null +++ b/redisson/src/main/java/org/redisson/BaseRedissonList.java @@ -0,0 +1,897 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.api.*; +import org.redisson.api.listener.*; +import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisException; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; +import org.redisson.client.protocol.convertor.Convertor; +import org.redisson.client.protocol.convertor.IntegerReplayConvertor; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.iterator.RedissonBaseIterator; +import org.redisson.iterator.RedissonListIterator; +import org.redisson.mapreduce.RedissonCollectionMapReduce; +import org.redisson.misc.CompletableFutureWrapper; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.function.Predicate; + +import static org.redisson.client.protocol.RedisCommands.*; + +/** + * Base list implementation + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public class BaseRedissonList extends RedissonExpirable { + + private RedissonClient redisson; + + BaseRedissonList(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(commandExecutor, name); + this.redisson = redisson; + } + + BaseRedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(codec, commandExecutor, name); + this.redisson = redisson; + } + + public RCollectionMapReduce mapReduce() { + return new RedissonCollectionMapReduce(this, redisson, commandExecutor); + } + + public int size() { + return get(sizeAsync()); + } + + public RFuture sizeAsync() { + return commandExecutor.readAsync(getRawName(), codec, LLEN_INT, getRawName()); + } + + public boolean isEmpty() { + return size() == 0; + } + + public boolean contains(Object o) { + return get(containsAsync(o)); + } + + public Iterator iterator() { + return listIterator(); + } + + public Object[] toArray() { + List list = readAll(); + return list.toArray(); + } + + public List readAll() { + return get(readAllAsync()); + } + + public RFuture> readAllAsync() { + return commandExecutor.readAsync(getRawName(), codec, LRANGE, getRawName(), 0, -1); + } + + public T[] toArray(T[] a) { + List list = readAll(); + return list.toArray(a); + } + + public boolean add(V e) { + return get(addAsync(e)); + } + + public RFuture addAsync(V e) { + return addAsync(e, RPUSH_BOOLEAN); + } + + protected RFuture addAsync(V e, RedisCommand command) { + return commandExecutor.writeAsync(getRawName(), codec, command, getRawName(), encode(e)); + } + + public boolean remove(Object o) { + return get(removeAsync(o)); + } + + public RFuture removeAsync(Object o) { + return removeAsync(o, 1); + } + + public RFuture removeAsync(Object o, int count) { + return commandExecutor.writeAsync(getRawName(), codec, LREM, getRawName(), count, encode(o)); + } + + public boolean remove(Object o, int count) { + return get(removeAsync(o, count)); + } + + public RFuture containsAllAsync(Collection c) { + if (c.isEmpty()) { + return new CompletableFutureWrapper<>(true); + } + + return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local items = redis.call('lrange', KEYS[1], 0, -1) " + + "for i=1, #items do " + + "for j = 1, #ARGV, 1 do " + + "if items[i] == ARGV[j] then " + + "table.remove(ARGV, j) " + + "end " + + "end " + + "end " + + "return #ARGV == 0 and 1 or 0", + Collections.singletonList(getRawName()), encode(c).toArray()); + } + + public boolean containsAll(Collection c) { + return get(containsAllAsync(c)); + } + + public boolean addAll(Collection c) { + return get(addAllAsync(c)); + } + + public RFuture addAllAsync(Collection c) { + if (c.isEmpty()) { + return new CompletableFutureWrapper<>(false); + } + + List args = new ArrayList(c.size() + 1); + args.add(getRawName()); + encode(args, c); + return commandExecutor.writeAsync(getRawName(), codec, RPUSH_BOOLEAN, args.toArray()); + } + + public RFuture addAllAsync(int index, Collection coll) { + if (index < 0) { + throw new IndexOutOfBoundsException("index: " + index); + } + + if (coll.isEmpty()) { + return new CompletableFutureWrapper<>(false); + } + + if (index == 0) { // prepend elements to list + List elements = new ArrayList(); + encode(elements, coll); + Collections.reverse(elements); + elements.add(0, getRawName()); + + return commandExecutor.writeAsync(getRawName(), codec, LPUSH_BOOLEAN, elements.toArray()); + } + + List args = new ArrayList(coll.size() + 1); + args.add(index); + encode(args, coll); + + return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local ind = table.remove(ARGV, 1); " + // index is the first parameter + "local size = redis.call('llen', KEYS[1]); " + + "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + + "local tail = redis.call('lrange', KEYS[1], ind, -1); " + + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + + "for i=1, #ARGV, 5000 do " + + "redis.call('rpush', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); " + + "end " + + "if #tail > 0 then " + + "for i=1, #tail, 5000 do " + + "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); " + + "end " + + "end;" + + "return 1;", + Collections.singletonList(getRawName()), args.toArray()); + } + + public boolean addAll(int index, Collection coll) { + return get(addAllAsync(index, coll)); + } + + public RFuture removeAllAsync(Collection c) { + if (c.isEmpty()) { + return new CompletableFutureWrapper<>(false); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local v = 0 " + + "for i = 1, #ARGV, 1 do " + + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 " + + "then v = 1 end " + +"end " + + "return v ", + Collections.singletonList(getRawName()), encode(c).toArray()); + } + + public boolean removeAll(Collection c) { + return get(removeAllAsync(c)); + } + + public boolean retainAll(Collection c) { + return get(retainAllAsync(c)); + } + + public RFuture retainAllAsync(Collection c) { + if (c.isEmpty()) { + return deleteAsync(); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local changed = 0 " + + "local items = redis.call('lrange', KEYS[1], 0, -1) " + + "local i = 1 " + + "while i <= #items do " + + "local element = items[i] " + + "local isInAgrs = false " + + "for j = 1, #ARGV, 1 do " + + "if ARGV[j] == element then " + + "isInAgrs = true " + + "break " + + "end " + + "end " + + "if isInAgrs == false then " + + "redis.call('LREM', KEYS[1], 0, element) " + + "changed = 1 " + + "end " + + "i = i + 1 " + + "end " + + "return changed ", + Collections.singletonList(getRawName()), encode(c).toArray()); + } + + + public void clear() { + delete(); + } + + public RFuture getAsync(int index) { + return commandExecutor.readAsync(getRawName(), codec, LINDEX, getRawName(), index); + } + + public List get(int... indexes) { + return get(getAsync(indexes)); + } + + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_list_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, count); + } + + public Iterator distributedIterator(final String iteratorName, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, count); + } + + @Override + protected void remove(Object value) { + BaseRedissonList.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, int count) { + return get(distributedScanIteratorAsync(iteratorName, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, int count) { + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SCAN, + "local start_index = redis.call('get', KEYS[2]); " + + "if start_index ~= false then " + + "start_index = tonumber(start_index); " + + "else " + + "start_index = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}};" + + "end;" + + "local end_index = start_index + ARGV[1];" + + "local result; " + + "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); " + + "if end_index > redis.call('llen', KEYS[1]) then " + + "end_index = -1;" + + "end; " + + "redis.call('setex', KEYS[2], 3600, end_index);" + + "return {end_index, result};", + Arrays.asList(getRawName(), iteratorName), count); + } + + public RFuture> getAsync(int... indexes) { + List params = new ArrayList(); + for (Integer index : indexes) { + params.add(index); + } + return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST, + "local result = {}; " + + "for i = 1, #ARGV, 1 do " + + "local value = redis.call('lindex', KEYS[1], ARGV[i]);" + + "table.insert(result, value);" + + "end; " + + "return result;", + Collections.singletonList(getRawName()), params.toArray()); + } + + + public V get(int index) { + return getValue(index); + } + + V getValue(int index) { + return get(getAsync(index)); + } + + public V set(int index, V element) { + try { + return get(setAsync(index, element)); + } catch (RedisException e) { + if (e.getCause() instanceof IndexOutOfBoundsException) { + throw (IndexOutOfBoundsException) e.getCause(); + } + throw e; + } + } + + public RFuture setAsync(int index, V element) { + RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, + "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + + "return v", + Collections.singletonList(getRawName()), index, encode(element)); + CompletionStage f = future.handle((res, e) -> { + if (e != null) { + if (e.getMessage().contains("ERR index out of range")) { + throw new CompletionException(new IndexOutOfBoundsException("index out of range")); + } + throw new CompletionException(e); + } + return res; + }); + return new CompletableFutureWrapper<>(f); + } + + public void fastSet(int index, V element) { + get(fastSetAsync(index, element)); + } + + public RFuture fastSetAsync(int index, V element) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LSET, getRawName(), index, encode(element)); + } + + public void add(int index, V element) { + addAll(index, Collections.singleton(element)); + } + + public RFuture addAsync(int index, V element) { + return addAllAsync(index, Collections.singleton(element)); + } + + public V remove(int index) { + return get(removeAsync(index)); + } + + public RFuture removeAsync(int index) { + if (index == 0) { + return commandExecutor.writeAsync(getRawName(), codec, LPOP, getRawName()); + } + + return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_OBJECT, + "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + + "return v", + Collections.singletonList(getRawName()), index); + } + + + public void fastRemove(int index) { + get(fastRemoveAsync(index)); + } + + public RFuture fastRemoveAsync(int index) { + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", + Collections.singletonList(getRawName()), index); + } + + public int indexOf(Object o) { + return get(indexOfAsync(o)); + } + + public RFuture containsAsync(Object o) { + return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L)); + } + + public RFuture indexOfAsync(Object o, Convertor convertor) { + return commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand("EVAL", convertor), + "local key = KEYS[1] " + + "local obj = ARGV[1] " + + "local items = redis.call('lrange', key, 0, -1) " + + "for i=1,#items do " + + "if items[i] == obj then " + + "return i - 1 " + + "end " + + "end " + + "return -1", + Collections.singletonList(getRawName()), encode(o)); + } + + public RFuture indexOfAsync(Object o) { + return indexOfAsync(o, new IntegerReplayConvertor()); + } + + public int lastIndexOf(Object o) { + return get(lastIndexOfAsync(o)); + } + + public RFuture lastIndexOfAsync(Object o) { + return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_INTEGER, + "local key = KEYS[1] " + + "local obj = ARGV[1] " + + "local items = redis.call('lrange', key, 0, -1) " + + "for i = #items, 1, -1 do " + + "if items[i] == obj then " + + "return i - 1 " + + "end " + + "end " + + "return -1", + Collections.singletonList(getRawName()), encode(o)); + } + + public RFuture lastIndexOfAsync(Object o, Convertor convertor) { + return commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand("EVAL", convertor), + "local key = KEYS[1] " + + "local obj = ARGV[1] " + + "local items = redis.call('lrange', key, 0, -1) " + + "for i = #items, 1, -1 do " + + "if items[i] == obj then " + + "return i - 1 " + + "end " + + "end " + + "return -1", + Collections.singletonList(getRawName()), encode(o)); + } + + public void trim(int fromIndex, int toIndex) { + get(trimAsync(fromIndex, toIndex)); + } + + public RFuture trimAsync(int fromIndex, int toIndex) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LTRIM, getRawName(), fromIndex, toIndex); + } + + public ListIterator listIterator() { + return listIterator(0); + } + + public ListIterator listIterator(int ind) { + return new RedissonListIterator(ind) { + + @Override + public V getValue(int index) { + return BaseRedissonList.this.getValue(index); + } + + @Override + public V remove(int index) { + return BaseRedissonList.this.remove(index); + } + + @Override + public void fastSet(int index, V value) { + BaseRedissonList.this.fastSet(index, value); + } + + @Override + public void add(int index, V value) { + BaseRedissonList.this.add(index, value); + } + }; + } + + public RList subList(int fromIndex, int toIndex) { + int size = size(); + if (fromIndex < 0 || toIndex > size) { + throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size); + } + if (fromIndex > toIndex) { + throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex); + } + + return new RedissonSubList(codec, commandExecutor, getRawName(), fromIndex, toIndex); + } + + @Override + @SuppressWarnings("AvoidInlineConditionals") + public String toString() { + Iterator it = iterator(); + if (! it.hasNext()) + return "[]"; + + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (;;) { + V e = it.next(); + sb.append(e == this ? "(this Collection)" : e); + if (! it.hasNext()) + return sb.append(']').toString(); + sb.append(',').append(' '); + } + } + + @Override + @SuppressWarnings("AvoidInlineConditionals") + public boolean equals(Object o) { + if (o == this) + return true; + if (!(o instanceof List)) + return false; + + Iterator e1 = iterator(); + Iterator e2 = ((List) o).iterator(); + while (e1.hasNext() && e2.hasNext()) { + V o1 = e1.next(); + Object o2 = e2.next(); + if (!(o1==null ? o2==null : o1.equals(o2))) + return false; + } + return !(e1.hasNext() || e2.hasNext()); + } + + @Override + @SuppressWarnings("AvoidInlineConditionals") + public int hashCode() { + int hashCode = 1; + Iterable ii = () -> iterator(); + for (V e : ii) { + hashCode = 31*hashCode + (e==null ? 0 : e.hashCode()); + } + return hashCode; + } + + public RFuture addAfterAsync(V elementToFind, V element) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LINSERT_INT, getRawName(), "AFTER", encode(elementToFind), encode(element)); + } + + public RFuture addBeforeAsync(V elementToFind, V element) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LINSERT_INT, getRawName(), "BEFORE", encode(elementToFind), encode(element)); + } + + public int addAfter(V elementToFind, V element) { + return get(addAfterAsync(elementToFind, element)); + } + + public int addBefore(V elementToFind, V element) { + return get(addBeforeAsync(elementToFind, element)); + } + + public List readSort(SortOrder order) { + return get(readSortAsync(order)); + } + + public RFuture> readSortAsync(SortOrder order) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), order); + } + + public List readSort(SortOrder order, int offset, int count) { + return get(readSortAsync(order, offset, count)); + } + + public RFuture> readSortAsync(SortOrder order, int offset, int count) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "LIMIT", offset, count, order); + } + + public List readSort(String byPattern, SortOrder order) { + return get(readSortAsync(byPattern, order)); + } + + public RFuture> readSortAsync(String byPattern, SortOrder order) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, order); + } + + public List readSort(String byPattern, SortOrder order, int offset, int count) { + return get(readSortAsync(byPattern, order, offset, count)); + } + + public RFuture> readSortAsync(String byPattern, SortOrder order, int offset, int count) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "LIMIT", offset, count, order); + } + + public Collection readSort(String byPattern, List getPatterns, SortOrder order) { + return (Collection) get(readSortAsync(byPattern, getPatterns, order)); + } + + public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order) { + return readSortAsync(byPattern, getPatterns, order, -1, -1); + } + + public Collection readSort(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return (Collection) get(readSortAsync(byPattern, getPatterns, order, offset, count)); + } + + public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, getPatterns, order, offset, count, false); + } + + public List readSortAlpha(SortOrder order) { + return get(readSortAlphaAsync(order)); + } + + public RFuture> readSortAlphaAsync(SortOrder order) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "ALPHA", order); + } + + public List readSortAlpha(SortOrder order, int offset, int count) { + return get(readSortAlphaAsync(order, offset, count)); + } + + public RFuture> readSortAlphaAsync(SortOrder order, int offset, int count) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "LIMIT", offset, count, "ALPHA", order); + } + + public List readSortAlpha(String byPattern, SortOrder order) { + return get(readSortAlphaAsync(byPattern, order)); + } + + public RFuture> readSortAlphaAsync(String byPattern, SortOrder order) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "ALPHA", order); + } + + public List readSortAlpha(String byPattern, SortOrder order, int offset, int count) { + return get(readSortAlphaAsync(byPattern, order, offset, count)); + } + + public RFuture> readSortAlphaAsync(String byPattern, SortOrder order, int offset, int count) { + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "LIMIT", offset, count, "ALPHA", order); + } + + public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order) { + return (Collection) get(readSortAlphaAsync(byPattern, getPatterns, order)); + } + + public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order) { + return readSortAlphaAsync(byPattern, getPatterns, order, -1, -1); + } + + public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return (Collection) get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count)); + } + + public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return readSortAsync(byPattern, getPatterns, order, offset, count, true); + } + + public int sortTo(String destName, SortOrder order) { + return get(sortToAsync(destName, order)); + } + + public RFuture sortToAsync(String destName, SortOrder order) { + return sortToAsync(destName, null, Collections.emptyList(), order, -1, -1); + } + + public int sortTo(String destName, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, order, offset, count)); + } + + public RFuture sortToAsync(String destName, SortOrder order, int offset, int count) { + return sortToAsync(destName, null, Collections.emptyList(), order, offset, count); + } + + public int sortTo(String destName, String byPattern, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, byPattern, order, offset, count)); + } + + public int sortTo(String destName, String byPattern, SortOrder order) { + return get(sortToAsync(destName, byPattern, order)); + } + + public RFuture sortToAsync(String destName, String byPattern, SortOrder order) { + return sortToAsync(destName, byPattern, Collections.emptyList(), order, -1, -1); + } + + public RFuture sortToAsync(String destName, String byPattern, SortOrder order, int offset, int count) { + return sortToAsync(destName, byPattern, Collections.emptyList(), order, offset, count); + } + + public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order) { + return get(sortToAsync(destName, byPattern, getPatterns, order)); + } + + public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order) { + return sortToAsync(destName, byPattern, getPatterns, order, -1, -1); + } + + public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { + return get(sortToAsync(destName, byPattern, getPatterns, order, offset, count)); + } + + public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { + List params = new ArrayList(); + params.add(getRawName()); + if (byPattern != null) { + params.add("BY"); + params.add(byPattern); + } + if (offset != -1 && count != -1) { + params.add("LIMIT"); + } + if (offset != -1) { + params.add(offset); + } + if (count != -1) { + params.add(count); + } + for (String pattern : getPatterns) { + params.add("GET"); + params.add(pattern); + } + params.add(order); + params.add("STORE"); + params.add(destName); + + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SORT_TO, params.toArray()); + } + + private RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count, boolean alpha) { + List params = new ArrayList(); + params.add(getRawName()); + if (byPattern != null) { + params.add("BY"); + params.add(byPattern); + } + if (offset != -1 && count != -1) { + params.add("LIMIT"); + } + if (offset != -1) { + params.add(offset); + } + if (count != -1) { + params.add(count); + } + if (getPatterns != null) { + for (String pattern : getPatterns) { + params.add("GET"); + params.add(pattern); + } + } + if (alpha) { + params.add("ALPHA"); + } + if (order != null) { + params.add(order); + } + + return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, params.toArray()); + } + + public RFuture> rangeAsync(int toIndex) { + return rangeAsync(0, toIndex); + } + + public RFuture> rangeAsync(int fromIndex, int toIndex) { + return commandExecutor.readAsync(getRawName(), codec, LRANGE, getRawName(), fromIndex, toIndex); + } + + public List range(int toIndex) { + return get(rangeAsync(toIndex)); + } + + public List range(int fromIndex, int toIndex) { + return get(rangeAsync(fromIndex, toIndex)); + } + + @Override + public int addListener(ObjectListener listener) { + if (listener instanceof ListAddListener) { + return addListener("__keyevent@*:rpush", (ListAddListener) listener, ListAddListener::onListAdd); + } + if (listener instanceof ListRemoveListener) { + return addListener("__keyevent@*:lrem", (ListRemoveListener) listener, ListRemoveListener::onListRemove); + } + if (listener instanceof ListTrimListener) { + return addListener("__keyevent@*:ltrim", (ListTrimListener) listener, ListTrimListener::onListTrim); + } + if (listener instanceof ListSetListener) { + return addListener("__keyevent@*:lset", (ListSetListener) listener, ListSetListener::onListSet); + } + if (listener instanceof ListInsertListener) { + return addListener("__keyevent@*:linsert", (ListInsertListener) listener, ListInsertListener::onListInsert); + } + return super.addListener(listener); + } + + @Override + public RFuture addListenerAsync(ObjectListener listener) { + if (listener instanceof ListAddListener) { + return addListenerAsync("__keyevent@*:rpush", (ListAddListener) listener, ListAddListener::onListAdd); + } + if (listener instanceof ListRemoveListener) { + return addListenerAsync("__keyevent@*:lrem", (ListRemoveListener) listener, ListRemoveListener::onListRemove); + } + if (listener instanceof ListTrimListener) { + return addListenerAsync("__keyevent@*:ltrim", (ListTrimListener) listener, ListTrimListener::onListTrim); + } + if (listener instanceof ListSetListener) { + return addListenerAsync("__keyevent@*:lset", (ListSetListener) listener, ListSetListener::onListSet); + } + if (listener instanceof ListInsertListener) { + return addListenerAsync("__keyevent@*:linsert", (ListInsertListener) listener, ListInsertListener::onListInsert); + } + return super.addListenerAsync(listener); + } + + @Override + public void removeListener(int listenerId) { + RPatternTopic addTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:rpush"); + addTopic.removeListener(listenerId); + + RPatternTopic remTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lrem"); + remTopic.removeListener(listenerId); + + RPatternTopic trimTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:ltrim"); + trimTopic.removeListener(listenerId); + + RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lset"); + setTopic.removeListener(listenerId); + + RPatternTopic insertTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:linsert"); + insertTopic.removeListener(listenerId); + + super.removeListener(listenerId); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + RPatternTopic addTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:rpush"); + RFuture f1 = addTopic.removeListenerAsync(listenerId); + + RPatternTopic remTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lrem"); + RFuture f2 = remTopic.removeListenerAsync(listenerId); + + RPatternTopic trimTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:ltrim"); + RFuture f3 = trimTopic.removeListenerAsync(listenerId); + + RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lset"); + RFuture f4 = setTopic.removeListenerAsync(listenerId); + + RPatternTopic insertTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:linsert"); + RFuture f5 = insertTopic.removeListenerAsync(listenerId); + + RFuture f6 = super.removeListenerAsync(listenerId); + + CompletableFuture f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture(), f3.toCompletableFuture(), + f4.toCompletableFuture(), f5.toCompletableFuture(), f5.toCompletableFuture(), f6.toCompletableFuture()); + return new CompletableFutureWrapper<>(f); + } + + public boolean removeIf(Predicate filter) { + throw new UnsupportedOperationException(); + } +} diff --git a/redisson/src/main/java/org/redisson/RedissonDeque.java b/redisson/src/main/java/org/redisson/RedissonDeque.java index 8e9b89005..33739a17b 100644 --- a/redisson/src/main/java/org/redisson/RedissonDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonDeque.java @@ -15,8 +15,6 @@ */ package org.redisson; -import java.util.*; - import org.redisson.api.RDeque; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -28,6 +26,8 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.command.CommandAsyncExecutor; +import java.util.*; + /** * Distributed and concurrent implementation of {@link java.util.Queue} * @@ -332,8 +332,4 @@ public class RedissonDeque extends RedissonQueue implements RDeque { return remove(o, -1); } - public RedissonDeque reversed() { - throw new UnsupportedOperationException(); - } - } diff --git a/redisson/src/main/java/org/redisson/RedissonList.java b/redisson/src/main/java/org/redisson/RedissonList.java index 36871d9d9..d823d819e 100644 --- a/redisson/src/main/java/org/redisson/RedissonList.java +++ b/redisson/src/main/java/org/redisson/RedissonList.java @@ -15,31 +15,10 @@ */ package org.redisson; -import org.redisson.api.*; -import org.redisson.api.listener.*; -import org.redisson.api.mapreduce.RCollectionMapReduce; -import org.redisson.client.RedisClient; -import org.redisson.client.RedisException; +import org.redisson.api.RList; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; -import org.redisson.client.protocol.convertor.Convertor; -import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.iterator.RedissonBaseIterator; -import org.redisson.iterator.RedissonListIterator; -import org.redisson.mapreduce.RedissonCollectionMapReduce; -import org.redisson.misc.CompletableFutureWrapper; - -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.function.Predicate; - -import static org.redisson.client.protocol.RedisCommands.*; /** * Distributed and concurrent implementation of {@link java.util.List} @@ -48,944 +27,14 @@ import static org.redisson.client.protocol.RedisCommands.*; * * @param the type of elements held in this collection */ -public class RedissonList extends RedissonExpirable implements RList { +public class RedissonList extends BaseRedissonList implements RList { - private RedissonClient redisson; - public RedissonList(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(commandExecutor, name); - this.redisson = redisson; + super(commandExecutor, name, redisson); } public RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(codec, commandExecutor, name); - this.redisson = redisson; - } - - @Override - public RCollectionMapReduce mapReduce() { - return new RedissonCollectionMapReduce(this, redisson, commandExecutor); - } - - @Override - public int size() { - return get(sizeAsync()); - } - - public RFuture sizeAsync() { - return commandExecutor.readAsync(getRawName(), codec, LLEN_INT, getRawName()); - } - - @Override - public boolean isEmpty() { - return size() == 0; - } - - @Override - public boolean contains(Object o) { - return get(containsAsync(o)); - } - - @Override - public Iterator iterator() { - return listIterator(); - } - - @Override - public Object[] toArray() { - List list = readAll(); - return list.toArray(); - } - - @Override - public List readAll() { - return get(readAllAsync()); - } - - @Override - public RFuture> readAllAsync() { - return commandExecutor.readAsync(getRawName(), codec, LRANGE, getRawName(), 0, -1); - } - - @Override - public T[] toArray(T[] a) { - List list = readAll(); - return list.toArray(a); - } - - @Override - public boolean add(V e) { - return get(addAsync(e)); - } - - @Override - public RFuture addAsync(V e) { - return addAsync(e, RPUSH_BOOLEAN); - } - - protected RFuture addAsync(V e, RedisCommand command) { - return commandExecutor.writeAsync(getRawName(), codec, command, getRawName(), encode(e)); - } - - @Override - public boolean remove(Object o) { - return get(removeAsync(o)); - } - - @Override - public RFuture removeAsync(Object o) { - return removeAsync(o, 1); - } - - @Override - public RFuture removeAsync(Object o, int count) { - return commandExecutor.writeAsync(getRawName(), codec, LREM, getRawName(), count, encode(o)); - } - - @Override - public boolean remove(Object o, int count) { - return get(removeAsync(o, count)); - } - - @Override - public RFuture containsAllAsync(Collection c) { - if (c.isEmpty()) { - return new CompletableFutureWrapper<>(true); - } - - return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, - "local items = redis.call('lrange', KEYS[1], 0, -1) " + - "for i=1, #items do " + - "for j = 1, #ARGV, 1 do " + - "if items[i] == ARGV[j] then " + - "table.remove(ARGV, j) " + - "end " + - "end " + - "end " + - "return #ARGV == 0 and 1 or 0", - Collections.singletonList(getRawName()), encode(c).toArray()); - } - - @Override - public boolean containsAll(Collection c) { - return get(containsAllAsync(c)); - } - - @Override - public boolean addAll(Collection c) { - return get(addAllAsync(c)); - } - - @Override - public RFuture addAllAsync(Collection c) { - if (c.isEmpty()) { - return new CompletableFutureWrapper<>(false); - } - - List args = new ArrayList(c.size() + 1); - args.add(getRawName()); - encode(args, c); - return commandExecutor.writeAsync(getRawName(), codec, RPUSH_BOOLEAN, args.toArray()); - } - - @Override - public RFuture addAllAsync(int index, Collection coll) { - if (index < 0) { - throw new IndexOutOfBoundsException("index: " + index); - } - - if (coll.isEmpty()) { - return new CompletableFutureWrapper<>(false); - } - - if (index == 0) { // prepend elements to list - List elements = new ArrayList(); - encode(elements, coll); - Collections.reverse(elements); - elements.add(0, getRawName()); - - return commandExecutor.writeAsync(getRawName(), codec, LPUSH_BOOLEAN, elements.toArray()); - } - - List args = new ArrayList(coll.size() + 1); - args.add(index); - encode(args, coll); - - return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, - "local ind = table.remove(ARGV, 1); " + // index is the first parameter - "local size = redis.call('llen', KEYS[1]); " + - "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + - "local tail = redis.call('lrange', KEYS[1], ind, -1); " + - "redis.call('ltrim', KEYS[1], 0, ind - 1); " + - "for i=1, #ARGV, 5000 do " - + "redis.call('rpush', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); " - + "end " + - "if #tail > 0 then " + - "for i=1, #tail, 5000 do " - + "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); " - + "end " - + "end;" + - "return 1;", - Collections.singletonList(getRawName()), args.toArray()); - } - - @Override - public boolean addAll(int index, Collection coll) { - return get(addAllAsync(index, coll)); - } - - @Override - public RFuture removeAllAsync(Collection c) { - if (c.isEmpty()) { - return new CompletableFutureWrapper<>(false); - } - - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, - "local v = 0 " + - "for i = 1, #ARGV, 1 do " - + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 " - + "then v = 1 end " - +"end " - + "return v ", - Collections.singletonList(getRawName()), encode(c).toArray()); - } - - @Override - public boolean removeAll(Collection c) { - return get(removeAllAsync(c)); - } - - @Override - public boolean retainAll(Collection c) { - return get(retainAllAsync(c)); - } - - @Override - public RFuture retainAllAsync(Collection c) { - if (c.isEmpty()) { - return deleteAsync(); - } - - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, - "local changed = 0 " + - "local items = redis.call('lrange', KEYS[1], 0, -1) " - + "local i = 1 " - + "while i <= #items do " - + "local element = items[i] " - + "local isInAgrs = false " - + "for j = 1, #ARGV, 1 do " - + "if ARGV[j] == element then " - + "isInAgrs = true " - + "break " - + "end " - + "end " - + "if isInAgrs == false then " - + "redis.call('LREM', KEYS[1], 0, element) " - + "changed = 1 " - + "end " - + "i = i + 1 " - + "end " - + "return changed ", - Collections.singletonList(getRawName()), encode(c).toArray()); - } - - - @Override - public void clear() { - delete(); - } - - @Override - public RFuture getAsync(int index) { - return commandExecutor.readAsync(getRawName(), codec, LINDEX, getRawName(), index); - } - - public List get(int... indexes) { - return get(getAsync(indexes)); - } - - @Override - public Iterator distributedIterator(final int count) { - String iteratorName = "__redisson_list_cursor_{" + getRawName() + "}"; - return distributedIterator(iteratorName, count); - } - - @Override - public Iterator distributedIterator(final String iteratorName, final int count) { - return new RedissonBaseIterator() { - - @Override - protected ScanResult iterator(RedisClient client, long nextIterPos) { - return distributedScanIterator(iteratorName, count); - } - - @Override - protected void remove(Object value) { - RedissonList.this.remove((V) value); - } - }; - } - - private ScanResult distributedScanIterator(String iteratorName, int count) { - return get(distributedScanIteratorAsync(iteratorName, count)); - } - - private RFuture> distributedScanIteratorAsync(String iteratorName, int count) { - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SCAN, - "local start_index = redis.call('get', KEYS[2]); " - + "if start_index ~= false then " - + "start_index = tonumber(start_index); " - + "else " - + "start_index = 0;" - + "end;" - + "if start_index == -1 then " - + "return {0, {}};" - + "end;" - + "local end_index = start_index + ARGV[1];" - + "local result; " - + "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); " - + "if end_index > redis.call('llen', KEYS[1]) then " - + "end_index = -1;" - + "end; " - + "redis.call('setex', KEYS[2], 3600, end_index);" - + "return {end_index, result};", - Arrays.asList(getRawName(), iteratorName), count); - } - - public RFuture> getAsync(int... indexes) { - List params = new ArrayList(); - for (Integer index : indexes) { - params.add(index); - } - return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST, - "local result = {}; " + - "for i = 1, #ARGV, 1 do " - + "local value = redis.call('lindex', KEYS[1], ARGV[i]);" - + "table.insert(result, value);" + - "end; " + - "return result;", - Collections.singletonList(getRawName()), params.toArray()); - } - - - @Override - public V get(int index) { - return getValue(index); - } - - V getValue(int index) { - return get(getAsync(index)); - } - - @Override - public V set(int index, V element) { - try { - return get(setAsync(index, element)); - } catch (RedisException e) { - if (e.getCause() instanceof IndexOutOfBoundsException) { - throw (IndexOutOfBoundsException) e.getCause(); - } - throw e; - } - } - - @Override - public RFuture setAsync(int index, V element) { - RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, - "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + - "return v", - Collections.singletonList(getRawName()), index, encode(element)); - CompletionStage f = future.handle((res, e) -> { - if (e != null) { - if (e.getMessage().contains("ERR index out of range")) { - throw new CompletionException(new IndexOutOfBoundsException("index out of range")); - } - throw new CompletionException(e); - } - return res; - }); - return new CompletableFutureWrapper<>(f); - } - - @Override - public void fastSet(int index, V element) { - get(fastSetAsync(index, element)); - } - - @Override - public RFuture fastSetAsync(int index, V element) { - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LSET, getRawName(), index, encode(element)); - } - - @Override - public void add(int index, V element) { - addAll(index, Collections.singleton(element)); - } - - @Override - public RFuture addAsync(int index, V element) { - return addAllAsync(index, Collections.singleton(element)); - } - - @Override - public V remove(int index) { - return get(removeAsync(index)); - } - - @Override - public RFuture removeAsync(int index) { - if (index == 0) { - return commandExecutor.writeAsync(getRawName(), codec, LPOP, getRawName()); - } - - return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_OBJECT, - "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + - "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + - "return v", - Collections.singletonList(getRawName()), index); - } - - - @Override - public void fastRemove(int index) { - get(fastRemoveAsync(index)); - } - - @Override - public RFuture fastRemoveAsync(int index) { - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, - "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + - "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", - Collections.singletonList(getRawName()), index); - } - - @Override - public int indexOf(Object o) { - return get(indexOfAsync(o)); - } - - @Override - public RFuture containsAsync(Object o) { - return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L)); - } - - public RFuture indexOfAsync(Object o, Convertor convertor) { - return commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand("EVAL", convertor), - "local key = KEYS[1] " + - "local obj = ARGV[1] " + - "local items = redis.call('lrange', key, 0, -1) " + - "for i=1,#items do " + - "if items[i] == obj then " + - "return i - 1 " + - "end " + - "end " + - "return -1", - Collections.singletonList(getRawName()), encode(o)); - } - - @Override - public RFuture indexOfAsync(Object o) { - return indexOfAsync(o, new IntegerReplayConvertor()); - } - - @Override - public int lastIndexOf(Object o) { - return get(lastIndexOfAsync(o)); - } - - @Override - public RFuture lastIndexOfAsync(Object o) { - return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_INTEGER, - "local key = KEYS[1] " + - "local obj = ARGV[1] " + - "local items = redis.call('lrange', key, 0, -1) " + - "for i = #items, 1, -1 do " + - "if items[i] == obj then " + - "return i - 1 " + - "end " + - "end " + - "return -1", - Collections.singletonList(getRawName()), encode(o)); - } - - public RFuture lastIndexOfAsync(Object o, Convertor convertor) { - return commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand("EVAL", convertor), - "local key = KEYS[1] " + - "local obj = ARGV[1] " + - "local items = redis.call('lrange', key, 0, -1) " + - "for i = #items, 1, -1 do " + - "if items[i] == obj then " + - "return i - 1 " + - "end " + - "end " + - "return -1", - Collections.singletonList(getRawName()), encode(o)); - } - - @Override - public void trim(int fromIndex, int toIndex) { - get(trimAsync(fromIndex, toIndex)); - } - - @Override - public RFuture trimAsync(int fromIndex, int toIndex) { - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LTRIM, getRawName(), fromIndex, toIndex); + super(codec, commandExecutor, name, redisson); } - @Override - public ListIterator listIterator() { - return listIterator(0); - } - - @Override - public ListIterator listIterator(int ind) { - return new RedissonListIterator(ind) { - - @Override - public V getValue(int index) { - return RedissonList.this.getValue(index); - } - - @Override - public V remove(int index) { - return RedissonList.this.remove(index); - } - - @Override - public void fastSet(int index, V value) { - RedissonList.this.fastSet(index, value); - } - - @Override - public void add(int index, V value) { - RedissonList.this.add(index, value); - } - }; - } - - @Override - public RList subList(int fromIndex, int toIndex) { - int size = size(); - if (fromIndex < 0 || toIndex > size) { - throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size); - } - if (fromIndex > toIndex) { - throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex); - } - - return new RedissonSubList(codec, commandExecutor, getRawName(), fromIndex, toIndex); - } - - @Override - @SuppressWarnings("AvoidInlineConditionals") - public String toString() { - Iterator it = iterator(); - if (! it.hasNext()) - return "[]"; - - StringBuilder sb = new StringBuilder(); - sb.append('['); - for (;;) { - V e = it.next(); - sb.append(e == this ? "(this Collection)" : e); - if (! it.hasNext()) - return sb.append(']').toString(); - sb.append(',').append(' '); - } - } - - @Override - @SuppressWarnings("AvoidInlineConditionals") - public boolean equals(Object o) { - if (o == this) - return true; - if (!(o instanceof List)) - return false; - - Iterator e1 = iterator(); - Iterator e2 = ((List) o).iterator(); - while (e1.hasNext() && e2.hasNext()) { - V o1 = e1.next(); - Object o2 = e2.next(); - if (!(o1==null ? o2==null : o1.equals(o2))) - return false; - } - return !(e1.hasNext() || e2.hasNext()); - } - - @Override - @SuppressWarnings("AvoidInlineConditionals") - public int hashCode() { - int hashCode = 1; - for (V e : this) { - hashCode = 31*hashCode + (e==null ? 0 : e.hashCode()); - } - return hashCode; - } - - @Override - public RFuture addAfterAsync(V elementToFind, V element) { - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LINSERT_INT, getRawName(), "AFTER", encode(elementToFind), encode(element)); - } - - @Override - public RFuture addBeforeAsync(V elementToFind, V element) { - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.LINSERT_INT, getRawName(), "BEFORE", encode(elementToFind), encode(element)); - } - - @Override - public int addAfter(V elementToFind, V element) { - return get(addAfterAsync(elementToFind, element)); - } - - @Override - public int addBefore(V elementToFind, V element) { - return get(addBeforeAsync(elementToFind, element)); - } - - @Override - public List readSort(SortOrder order) { - return get(readSortAsync(order)); - } - - @Override - public RFuture> readSortAsync(SortOrder order) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), order); - } - - @Override - public List readSort(SortOrder order, int offset, int count) { - return get(readSortAsync(order, offset, count)); - } - - @Override - public RFuture> readSortAsync(SortOrder order, int offset, int count) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "LIMIT", offset, count, order); - } - - @Override - public List readSort(String byPattern, SortOrder order) { - return get(readSortAsync(byPattern, order)); - } - - @Override - public RFuture> readSortAsync(String byPattern, SortOrder order) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, order); - } - - @Override - public List readSort(String byPattern, SortOrder order, int offset, int count) { - return get(readSortAsync(byPattern, order, offset, count)); - } - - @Override - public RFuture> readSortAsync(String byPattern, SortOrder order, int offset, int count) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "LIMIT", offset, count, order); - } - - @Override - public Collection readSort(String byPattern, List getPatterns, SortOrder order) { - return (Collection) get(readSortAsync(byPattern, getPatterns, order)); - } - - @Override - public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order) { - return readSortAsync(byPattern, getPatterns, order, -1, -1); - } - - @Override - public Collection readSort(String byPattern, List getPatterns, SortOrder order, int offset, int count) { - return (Collection) get(readSortAsync(byPattern, getPatterns, order, offset, count)); - } - - @Override - public RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { - return readSortAsync(byPattern, getPatterns, order, offset, count, false); - } - - @Override - public List readSortAlpha(SortOrder order) { - return get(readSortAlphaAsync(order)); - } - - @Override - public RFuture> readSortAlphaAsync(SortOrder order) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "ALPHA", order); - } - - @Override - public List readSortAlpha(SortOrder order, int offset, int count) { - return get(readSortAlphaAsync(order, offset, count)); - } - - @Override - public RFuture> readSortAlphaAsync(SortOrder order, int offset, int count) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "LIMIT", offset, count, "ALPHA", order); - } - - @Override - public List readSortAlpha(String byPattern, SortOrder order) { - return get(readSortAlphaAsync(byPattern, order)); - } - - @Override - public RFuture> readSortAlphaAsync(String byPattern, SortOrder order) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "ALPHA", order); - } - - @Override - public List readSortAlpha(String byPattern, SortOrder order, int offset, int count) { - return get(readSortAlphaAsync(byPattern, order, offset, count)); - } - - @Override - public RFuture> readSortAlphaAsync(String byPattern, SortOrder order, int offset, int count) { - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, getRawName(), "BY", byPattern, "LIMIT", offset, count, "ALPHA", order); - } - - @Override - public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order) { - return (Collection) get(readSortAlphaAsync(byPattern, getPatterns, order)); - } - - @Override - public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order) { - return readSortAlphaAsync(byPattern, getPatterns, order, -1, -1); - } - - @Override - public Collection readSortAlpha(String byPattern, List getPatterns, SortOrder order, int offset, int count) { - return (Collection) get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count)); - } - - @Override - public RFuture> readSortAlphaAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count) { - return readSortAsync(byPattern, getPatterns, order, offset, count, true); - } - - @Override - public int sortTo(String destName, SortOrder order) { - return get(sortToAsync(destName, order)); - } - - @Override - public RFuture sortToAsync(String destName, SortOrder order) { - return sortToAsync(destName, null, Collections.emptyList(), order, -1, -1); - } - - @Override - public int sortTo(String destName, SortOrder order, int offset, int count) { - return get(sortToAsync(destName, order, offset, count)); - } - - @Override - public RFuture sortToAsync(String destName, SortOrder order, int offset, int count) { - return sortToAsync(destName, null, Collections.emptyList(), order, offset, count); - } - - @Override - public int sortTo(String destName, String byPattern, SortOrder order, int offset, int count) { - return get(sortToAsync(destName, byPattern, order, offset, count)); - } - - @Override - public int sortTo(String destName, String byPattern, SortOrder order) { - return get(sortToAsync(destName, byPattern, order)); - } - - @Override - public RFuture sortToAsync(String destName, String byPattern, SortOrder order) { - return sortToAsync(destName, byPattern, Collections.emptyList(), order, -1, -1); - } - - @Override - public RFuture sortToAsync(String destName, String byPattern, SortOrder order, int offset, int count) { - return sortToAsync(destName, byPattern, Collections.emptyList(), order, offset, count); - } - - @Override - public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order) { - return get(sortToAsync(destName, byPattern, getPatterns, order)); - } - - @Override - public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order) { - return sortToAsync(destName, byPattern, getPatterns, order, -1, -1); - } - - @Override - public int sortTo(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { - return get(sortToAsync(destName, byPattern, getPatterns, order, offset, count)); - } - - @Override - public RFuture sortToAsync(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count) { - List params = new ArrayList(); - params.add(getRawName()); - if (byPattern != null) { - params.add("BY"); - params.add(byPattern); - } - if (offset != -1 && count != -1) { - params.add("LIMIT"); - } - if (offset != -1) { - params.add(offset); - } - if (count != -1) { - params.add(count); - } - for (String pattern : getPatterns) { - params.add("GET"); - params.add(pattern); - } - params.add(order); - params.add("STORE"); - params.add(destName); - - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SORT_TO, params.toArray()); - } - - private RFuture> readSortAsync(String byPattern, List getPatterns, SortOrder order, int offset, int count, boolean alpha) { - List params = new ArrayList(); - params.add(getRawName()); - if (byPattern != null) { - params.add("BY"); - params.add(byPattern); - } - if (offset != -1 && count != -1) { - params.add("LIMIT"); - } - if (offset != -1) { - params.add(offset); - } - if (count != -1) { - params.add(count); - } - if (getPatterns != null) { - for (String pattern : getPatterns) { - params.add("GET"); - params.add(pattern); - } - } - if (alpha) { - params.add("ALPHA"); - } - if (order != null) { - params.add(order); - } - - return commandExecutor.readAsync(getRawName(), codec, RedisCommands.SORT_LIST, params.toArray()); - } - - @Override - public RFuture> rangeAsync(int toIndex) { - return rangeAsync(0, toIndex); - } - - @Override - public RFuture> rangeAsync(int fromIndex, int toIndex) { - return commandExecutor.readAsync(getRawName(), codec, LRANGE, getRawName(), fromIndex, toIndex); - } - - @Override - public List range(int toIndex) { - return get(rangeAsync(toIndex)); - } - - @Override - public List range(int fromIndex, int toIndex) { - return get(rangeAsync(fromIndex, toIndex)); - } - - @Override - public int addListener(ObjectListener listener) { - if (listener instanceof ListAddListener) { - return addListener("__keyevent@*:rpush", (ListAddListener) listener, ListAddListener::onListAdd); - } - if (listener instanceof ListRemoveListener) { - return addListener("__keyevent@*:lrem", (ListRemoveListener) listener, ListRemoveListener::onListRemove); - } - if (listener instanceof ListTrimListener) { - return addListener("__keyevent@*:ltrim", (ListTrimListener) listener, ListTrimListener::onListTrim); - } - if (listener instanceof ListSetListener) { - return addListener("__keyevent@*:lset", (ListSetListener) listener, ListSetListener::onListSet); - } - if (listener instanceof ListInsertListener) { - return addListener("__keyevent@*:linsert", (ListInsertListener) listener, ListInsertListener::onListInsert); - } - return super.addListener(listener); - } - - @Override - public RFuture addListenerAsync(ObjectListener listener) { - if (listener instanceof ListAddListener) { - return addListenerAsync("__keyevent@*:rpush", (ListAddListener) listener, ListAddListener::onListAdd); - } - if (listener instanceof ListRemoveListener) { - return addListenerAsync("__keyevent@*:lrem", (ListRemoveListener) listener, ListRemoveListener::onListRemove); - } - if (listener instanceof ListTrimListener) { - return addListenerAsync("__keyevent@*:ltrim", (ListTrimListener) listener, ListTrimListener::onListTrim); - } - if (listener instanceof ListSetListener) { - return addListenerAsync("__keyevent@*:lset", (ListSetListener) listener, ListSetListener::onListSet); - } - if (listener instanceof ListInsertListener) { - return addListenerAsync("__keyevent@*:linsert", (ListInsertListener) listener, ListInsertListener::onListInsert); - } - return super.addListenerAsync(listener); - } - - @Override - public void removeListener(int listenerId) { - RPatternTopic addTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:rpush"); - addTopic.removeListener(listenerId); - - RPatternTopic remTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lrem"); - remTopic.removeListener(listenerId); - - RPatternTopic trimTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:ltrim"); - trimTopic.removeListener(listenerId); - - RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lset"); - setTopic.removeListener(listenerId); - - RPatternTopic insertTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:linsert"); - insertTopic.removeListener(listenerId); - - super.removeListener(listenerId); - } - - @Override - public RFuture removeListenerAsync(int listenerId) { - RPatternTopic addTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:rpush"); - RFuture f1 = addTopic.removeListenerAsync(listenerId); - - RPatternTopic remTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lrem"); - RFuture f2 = remTopic.removeListenerAsync(listenerId); - - RPatternTopic trimTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:ltrim"); - RFuture f3 = trimTopic.removeListenerAsync(listenerId); - - RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lset"); - RFuture f4 = setTopic.removeListenerAsync(listenerId); - - RPatternTopic insertTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:linsert"); - RFuture f5 = insertTopic.removeListenerAsync(listenerId); - - RFuture f6 = super.removeListenerAsync(listenerId); - - CompletableFuture f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture(), f3.toCompletableFuture(), - f4.toCompletableFuture(), f5.toCompletableFuture(), f5.toCompletableFuture(), f6.toCompletableFuture()); - return new CompletableFutureWrapper<>(f); - } - - @Override - public boolean removeIf(Predicate filter) { - throw new UnsupportedOperationException(); - } } diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java index e8b9bc65a..9f1d1e86b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java @@ -329,7 +329,4 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement throw new UnsupportedOperationException(); } - public RedissonPriorityDeque reversed() { - throw new UnsupportedOperationException(); - } } diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java index 0d8d111d3..67606649e 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java @@ -38,7 +38,7 @@ import java.util.function.Supplier; * * @param value type */ -public class RedissonPriorityQueue extends RedissonList implements RPriorityQueue { +public class RedissonPriorityQueue extends BaseRedissonList implements RPriorityQueue { public static class BinarySearchResult { diff --git a/redisson/src/main/java/org/redisson/RedissonQueue.java b/redisson/src/main/java/org/redisson/RedissonQueue.java index cf56c84dd..58cbd180f 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonQueue.java @@ -33,7 +33,7 @@ import java.util.NoSuchElementException; * * @param the type of elements held in this collection */ -public class RedissonQueue extends RedissonList implements RQueue { +public class RedissonQueue extends BaseRedissonList implements RQueue { public RedissonQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name, redisson); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java index 445b91f67..c4c0a99e7 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java @@ -15,14 +15,13 @@ */ package org.redisson.reactive; -import java.util.concurrent.Callable; - +import org.redisson.BaseRedissonList; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; -import org.redisson.api.RListAsync; - import reactor.core.publisher.Flux; +import java.util.concurrent.Callable; + /** * * @author Nikita Koksharov @@ -34,7 +33,7 @@ public class RedissonBlockingQueueReactive extends RedissonListReactive { private final RBlockingQueue queue; public RedissonBlockingQueueReactive(RBlockingQueue queue) { - super((RListAsync) queue); + super((BaseRedissonList) queue); this.queue = queue; } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index c98e91b82..a9d1ae4f6 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -15,18 +15,17 @@ */ package org.redisson.reactive; -import java.util.function.Consumer; -import java.util.function.LongConsumer; - import org.reactivestreams.Publisher; +import org.redisson.BaseRedissonList; import org.redisson.RedissonList; import org.redisson.api.RFuture; -import org.redisson.api.RListAsync; import org.redisson.client.codec.Codec; - import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import java.util.function.Consumer; +import java.util.function.LongConsumer; + /** * Distributed and concurrent implementation of {@link java.util.List} * @@ -36,9 +35,9 @@ import reactor.core.publisher.FluxSink; */ public class RedissonListReactive { - private final RListAsync instance; + private final BaseRedissonList instance; - public RedissonListReactive(RListAsync instance) { + public RedissonListReactive(BaseRedissonList instance) { this.instance = instance; } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java index d813b2ea8..900215eca 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java @@ -15,10 +15,9 @@ */ package org.redisson.rx; -import org.redisson.api.RBlockingQueueAsync; -import org.redisson.api.RListAsync; - import io.reactivex.rxjava3.core.Flowable; +import org.redisson.BaseRedissonList; +import org.redisson.api.RBlockingQueueAsync; /** * @@ -31,7 +30,7 @@ public class RedissonBlockingQueueRx extends RedissonListRx { private final RBlockingQueueAsync queue; public RedissonBlockingQueueRx(RBlockingQueueAsync queue) { - super((RListAsync) queue); + super((BaseRedissonList) queue); this.queue = queue; } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonListRx.java b/redisson/src/main/java/org/redisson/rx/RedissonListRx.java index 558e17265..e4ad27cf1 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonListRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonListRx.java @@ -15,13 +15,12 @@ */ package org.redisson.rx; -import org.reactivestreams.Publisher; -import org.redisson.api.RFuture; -import org.redisson.api.RListAsync; - import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.functions.LongConsumer; import io.reactivex.rxjava3.processors.ReplayProcessor; +import org.reactivestreams.Publisher; +import org.redisson.BaseRedissonList; +import org.redisson.api.RFuture; /** * Distributed and concurrent implementation of {@link java.util.List} @@ -32,9 +31,9 @@ import io.reactivex.rxjava3.processors.ReplayProcessor; */ public class RedissonListRx { - private final RListAsync instance; + private final BaseRedissonList instance; - public RedissonListRx(RListAsync instance) { + public RedissonListRx(BaseRedissonList instance) { this.instance = instance; } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java index 9a5b9fa25..bdfb28a30 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java @@ -44,7 +44,7 @@ public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest { .repeat() .subscribe(); - Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> { assertThat(counter.get()).isEqualTo(100); }); } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java index 199d1bf70..410e61835 100644 --- a/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java +++ b/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java @@ -52,7 +52,7 @@ public class RedissonBlockingDequeRxTest extends BaseRxTest { RBlockingDequeRx blockingDeque = redisson.getBlockingDeque("blocking_deque"); long start = System.currentTimeMillis(); String redisTask = sync(blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS)); - assertThat(System.currentTimeMillis() - start).isBetween(950L, 1500L); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1600L); assertThat(redisTask).isNull(); }