refactoring

pull/482/head
Nikita 9 years ago
parent b870b9cc08
commit 3304e3caef

@ -34,7 +34,6 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos;
protected long startPos = -1;
protected InetSocketAddress client;
private boolean finished;
@ -44,13 +43,22 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (lastIter == null || !lastIter.hasNext()) {
if (nextIterPos == -1) {
return false;
if (finished) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
if (!tryAgain()) {
return false;
}
finished = false;
}
long prevIterPos;
do {
@ -61,22 +69,36 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
}
lastValues = convert(res.getMap());
client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
prevIterPos = -1;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
continue;
}
} else if (lastValues.keySet().removeAll(firstValues.keySet())) {
finished = true;
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
if (tryAgain()) {
continue;
}
finished = true;
return false;
}
}
@ -84,16 +106,23 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
nextIterPos = res.getPos();
} while (!lastIter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
finished = true;
}
}
return lastIter.hasNext();
}
protected boolean tryAgain() {
return false;
}
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
private void free(Map<ByteBuf, ByteBuf> map) {
if (map == null) {
return;
}
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();

@ -95,7 +95,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> containsKeyAsync(Object key) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HEXISTS, getName(), key);
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HEXISTS, getName(key), key);
}
@Override
@ -226,13 +226,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> putIfAbsentAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
"if redis.call('hsetnx', KEYS[1], ARGV[1], ARGV[2]) == 1 then "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[1]) "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
Collections.<Object>singletonList(getName(key)), key, value);
}
@Override
@ -242,7 +242,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> fastPutIfAbsentAsync(K key, V value) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HSETNX, getName(), key, value);
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), key, value);
}
@Override
@ -252,13 +252,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Long> removeAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
Collections.<Object>singletonList(getName(key)), key, value);
}
@Override
@ -268,14 +268,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
Collections.<Object>singletonList(getName(key)), key, oldValue, newValue);
}
@Override
@ -285,7 +285,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> replaceAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
@ -293,36 +293,40 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
+ "else "
+ "return nil; "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
Collections.<Object>singletonList(getName(key)), key, value);
}
@Override
public Future<V> getAsync(K key) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HGET, getName(), key);
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), key);
}
protected String getName(Object key) {
return getName();
}
@Override
public Future<V> putAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v",
Collections.<Object>singletonList(getName()), key, value);
Collections.<Object>singletonList(getName(key)), key, value);
}
@Override
public Future<V> removeAsync(K key) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "return v",
Collections.<Object>singletonList(getName()), key);
Collections.<Object>singletonList(getName(key)), key);
}
@Override
public Future<Boolean> fastPutAsync(K key, V value) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HSET, getName(), key, value);
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), key, value);
}
@Override
@ -347,8 +351,9 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos);
return get(f);
}
@ -423,9 +428,9 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Future<V> addAndGetAsync(K key, Number value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE,
return commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(), keyState, new BigDecimal(value.toString()).toPlainString());
getName(key), keyState, new BigDecimal(value.toString()).toPlainString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}

@ -530,7 +530,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,

@ -29,7 +29,7 @@ public class RedissonMapIterator<K, V, M> extends RedissonBaseMapIterator<K, V,
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, nextIterPos);
return map.scanIterator(map.getName(), client, nextIterPos);
}
protected void removeKey() {

Loading…
Cancel
Save