diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 3428d15ac..79915fedf 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -81,6 +81,10 @@ public abstract class RedissonObject implements RObject { public String getName() { return name; } + + protected String getName(Object o) { + return getName(); + } @Override public void rename(String newName) { diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 524cae608..590461ccf 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -44,7 +44,7 @@ import org.redisson.command.CommandAsyncExecutor; * * @param value */ -public class RedissonSet extends RedissonExpirable implements RSet { +public class RedissonSet extends RedissonExpirable implements RSet, ScanIterator { protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -83,7 +83,8 @@ public class RedissonSet extends RedissonExpirable implements RSet { return getName(); } - ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { + @Override + public ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index e31073248..131b5ab96 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -57,8 +57,16 @@ import org.redisson.eviction.EvictionScheduler; * * @param value */ -public class RedissonSetCache extends RedissonExpirable implements RSetCache { +public class RedissonSetCache extends RedissonExpirable implements RSetCache, ScanIterator { + RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); evictionScheduler.schedule(getName()); @@ -91,7 +99,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public RFuture containsAsync(Object o) { - return commandExecutor.evalReadAsync(getName(), codec, new RedisStrictCommand("EVAL", new BooleanReplayConvertor(), 5), + return commandExecutor.evalReadAsync(getName(o), codec, new RedisStrictCommand("EVAL", new BooleanReplayConvertor(), 5), "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + "if expireDateScore ~= false then " + "if tonumber(expireDateScore) <= tonumber(ARGV[1]) then " + @@ -102,16 +110,16 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< "else " + "return 0;" + "end; ", - Arrays.asList(getName()), System.currentTimeMillis(), o); + Arrays.asList(getName(o)), System.currentTimeMillis(), o); } - ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = scanIteratorAsync(client, startPos); + public ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { + RFuture> f = scanIteratorAsync(name, client, startPos); return get(f); } - public RFuture> scanIteratorAsync(InetSocketAddress client, long startPos) { - return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN, + public RFuture> scanIteratorAsync(String name, InetSocketAddress client, long startPos) { + return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN, "local result = {}; " + "local res = redis.call('zscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " @@ -122,7 +130,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< + "end; " + "end;" + "end;" - + "return {res[1], result};", Arrays.asList(getName()), startPos, System.currentTimeMillis()); + + "return {res[1], result};", Arrays.asList(name), startPos, System.currentTimeMillis()); } @Override @@ -131,7 +139,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override ListScanResult iterator(InetSocketAddress client, long nextIterPos) { - return scanIterator(client, nextIterPos); + return scanIterator(getName(), client, nextIterPos); } @Override @@ -149,27 +157,18 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public RFuture> readAllAsync() { - return (RFuture>)readAllAsync(RedisCommands.ZRANGEBYSCORE); - } - - private RFuture readAllAsync(RedisCommand> command) { - return commandExecutor.readAsync(getName(), codec, command, getName(), System.currentTimeMillis(), 92233720368547758L); - } - - - private RFuture> readAllasListAsync() { - return (RFuture>)readAllAsync(RedisCommands.ZRANGEBYSCORE_LIST); + return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), System.currentTimeMillis(), 92233720368547758L); } @Override public Object[] toArray() { - List res = get(readAllasListAsync()); + Set res = get(readAllAsync()); return res.toArray(); } @Override public T[] toArray(T[] a) { - List res = get(readAllasListAsync()); + Set res = get(readAllAsync()); return res.toArray(a); } @@ -199,14 +198,14 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< byte[] objectState = encode(value); long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl); - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(getName(value), codec, RedisCommands.EVAL_BOOLEAN, "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " + "return 0;" + "end; " + "return 1; ", - Arrays.asList(getName()), System.currentTimeMillis(), timeoutDate, objectState); + Arrays.asList(getName(value)), System.currentTimeMillis(), timeoutDate, objectState); } @Override @@ -216,7 +215,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public RFuture removeAsync(Object o) { - return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZREM, getName(), o); + return commandExecutor.writeAsync(getName(o), codec, RedisCommands.ZREM, getName(o), o); } @Override diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java new file mode 100644 index 000000000..5f06e8d45 --- /dev/null +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.net.InetSocketAddress; + +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +public interface ScanIterator { + + ListScanResult scanIterator(String name, InetSocketAddress client, long startPos); + + boolean remove(Object value); + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 36b401aec..dcd390e1d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -78,7 +78,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple } Publisher> scanIterator(InetSocketAddress client, long startPos) { - return reactive(instance.scanIteratorAsync(client, startPos)); + return reactive(instance.scanIteratorAsync(getName(), client, startPos)); } @Override