Refactoring

pull/766/head
Nikita 8 years ago
parent 3ab9de7b1e
commit 53f98a3e70

@ -81,6 +81,10 @@ public abstract class RedissonObject implements RObject {
public String getName() {
return name;
}
protected String getName(Object o) {
return getName();
}
@Override
public void rename(String newName) {

@ -44,7 +44,7 @@ import org.redisson.command.CommandAsyncExecutor;
*
* @param <V> value
*/
public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIterator {
protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -83,7 +83,8 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return getName();
}
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f);
}

@ -57,8 +57,16 @@ import org.redisson.eviction.EvictionScheduler;
*
* @param <V> value
*/
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V> {
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V>, ScanIterator {
RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
evictionScheduler.schedule(getName());
@ -91,7 +99,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public RFuture<Boolean> containsAsync(Object o) {
return commandExecutor.evalReadAsync(getName(), codec, new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
return commandExecutor.evalReadAsync(getName(o), codec, new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " +
"if expireDateScore ~= false then " +
"if tonumber(expireDateScore) <= tonumber(ARGV[1]) then " +
@ -102,16 +110,16 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
"else " +
"return 0;" +
"end; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), o);
Arrays.<Object>asList(getName(o)), System.currentTimeMillis(), o);
}
ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(client, startPos);
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos);
return get(f);
}
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res = redis.call('zscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
@ -122,7 +130,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
+ "end; "
+ "end;"
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName()), startPos, System.currentTimeMillis());
+ "return {res[1], result};", Arrays.<Object>asList(name), startPos, System.currentTimeMillis());
}
@Override
@ -131,7 +139,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
return scanIterator(getName(), client, nextIterPos);
}
@Override
@ -149,27 +157,18 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public RFuture<Set<V>> readAllAsync() {
return (RFuture<Set<V>>)readAllAsync(RedisCommands.ZRANGEBYSCORE);
}
private RFuture<?> readAllAsync(RedisCommand<? extends Collection<?>> command) {
return commandExecutor.readAsync(getName(), codec, command, getName(), System.currentTimeMillis(), 92233720368547758L);
}
private RFuture<List<Object>> readAllasListAsync() {
return (RFuture<List<Object>>)readAllAsync(RedisCommands.ZRANGEBYSCORE_LIST);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), System.currentTimeMillis(), 92233720368547758L);
}
@Override
public Object[] toArray() {
List<Object> res = get(readAllasListAsync());
Set<V> res = get(readAllAsync());
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<Object> res = get(readAllasListAsync());
Set<V> res = get(readAllAsync());
return res.toArray(a);
}
@ -199,14 +198,14 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
byte[] objectState = encode(value);
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(value), codec, RedisCommands.EVAL_BOOLEAN,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " +
"return 0;" +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, objectState);
Arrays.<Object>asList(getName(value)), System.currentTimeMillis(), timeoutDate, objectState);
}
@Override
@ -216,7 +215,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public RFuture<Boolean> removeAsync(Object o) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZREM, getName(), o);
return commandExecutor.writeAsync(getName(o), codec, RedisCommands.ZREM, getName(o), o);
}
@Override

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos);
boolean remove(Object value);
}

@ -78,7 +78,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
}
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(InetSocketAddress client, long startPos) {
return reactive(instance.scanIteratorAsync(client, startPos));
return reactive(instance.scanIteratorAsync(getName(), client, startPos));
}
@Override

Loading…
Cancel
Save