RMap iterator state comparison fixed. #357

pull/365/head
Nikita 9 years ago
parent f5ec498f6b
commit 084d368cfa

@ -30,7 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -39,11 +41,13 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.Predicate;
import org.redisson.core.RMap;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
/**
@ -315,8 +319,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
MapScanResult<Object, V> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<Object, V>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos);
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}
@ -329,8 +333,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Iterator<V> valueIterator() {
return new RedissonMapIterator<K, V, V>(this) {
@Override
V getValue(java.util.Map.Entry<K, V> entry) {
return entry.getValue();
V getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
}
};
}
@ -339,8 +343,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Iterator<K> keyIterator() {
return new RedissonMapIterator<K, V, K>(this) {
@Override
K getValue(java.util.Map.Entry<K, V> entry) {
return entry.getKey();
K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
}
};
}

@ -37,6 +37,7 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.CacheGetAllDecoder;
@ -237,7 +238,10 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[1]); " +
"local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); "
+ "if expireDate == false then "
+ "local s = string.gmatch(value, '[^:]+'); "
+ "print(s()) " +
// "redis.call('zscore', KEYS[2], ARGV[1]); "
"if expireDate == false then "
+ "expireDate = 92233720368547758; "
+ "end; " +
"return {expireDate, value}; ",
@ -322,6 +326,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); "
+ "redis.call('zadd', 'test_map', ARGV[1], ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); "
+ "return v",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, key, value);
@ -355,8 +360,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
MapScanResult<Object, V> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<Object, V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "

@ -16,22 +16,27 @@
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
public class RedissonMapIterator<K, V, M> implements Iterator<M> {
private Map<K, V> firstValues;
private Iterator<Map.Entry<K, V>> iter;
private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
private long iterPos = 0;
private InetSocketAddress client;
private boolean removeExecuted;
private Map.Entry<K, V> entry;
private Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
private final RedissonMap<K, V> map;
@ -42,19 +47,27 @@ public class RedissonMapIterator<K, V, M> implements Iterator<M> {
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
MapScanResult<Object, V> res = map.scanIterator(client, iterPos);
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = map.scanIterator(client, iterPos);
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = (Map<K, V>) res.getMap();
} else if (res.getMap().equals(firstValues)) {
firstValues = convert(res.getMap());
} else if (convert(res.getMap()).equals(firstValues)) {
return false;
}
iter = ((Map<K, V>)res.getMap()).entrySet().iterator();
iter = res.getMap().entrySet().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
}
return result;
}
@Override
public M next() {
if (!hasNext()) {
@ -66,8 +79,16 @@ public class RedissonMapIterator<K, V, M> implements Iterator<M> {
return getValue(entry);
}
M getValue(Entry<K, V> entry) {
return (M) entry;
@SuppressWarnings("unchecked")
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
@Override
public V setValue(V value) {
return map.put((K) entry.getKey().getObj(), value);
}
};
}
@Override
@ -79,7 +100,7 @@ public class RedissonMapIterator<K, V, M> implements Iterator<M> {
// lazy init iterator
hasNext();
iter.remove();
map.fastRemove(entry.getKey());
map.fastRemove((K)entry.getKey().getObj());
removeExecuted = true;
}

@ -0,0 +1,78 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
public class ScanCodec implements Codec {
public Codec delegate;
public ScanCodec(Codec delegate) {
super();
this.delegate = delegate;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
Object val = delegate.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(buf, val);
}
};
}
@Override
public Encoder getMapValueEncoder() {
return delegate.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(buf, val);
}
};
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
}

@ -0,0 +1,38 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import io.netty.buffer.ByteBuf;
public class ScanObjectEntry {
private final ByteBuf buf;
private final Object obj;
public ScanObjectEntry(ByteBuf buf, Object obj) {
this.buf = buf;
this.obj = obj;
}
public ByteBuf getBuf() {
return buf;
}
public Object getObj() {
return obj;
}
}
Loading…
Cancel
Save