Merge branch 'mrniko/master' into feature/travis-ci

pull/509/head
Rui Gu 9 years ago
commit 7da39e4053

@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -32,7 +31,8 @@ import io.netty.buffer.ByteBuf;
abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
private Map<ByteBuf, ByteBuf> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos;
protected long startPos = -1;
protected InetSocketAddress client;
@ -48,7 +48,7 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
return false;
}
if (iter == null || !iter.hasNext()) {
if (lastIter == null || !lastIter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
@ -56,31 +56,38 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
do {
prevIterPos = nextIterPos;
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getMap());
client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
firstValues = lastValues;
lastValues = null;
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (firstValues.entrySet().containsAll(newValues.entrySet())) {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
} else if (lastValues.keySet().removeAll(firstValues.keySet())) {
finished = true;
free(firstValues);
free(newValues);
free(lastValues);
firstValues = null;
lastValues = null;
return false;
}
free(newValues);
}
iter = res.getMap().entrySet().iterator();
lastIter = res.getMap().entrySet().iterator();
nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos);
} while (!lastIter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
return lastIter.hasNext();
}
@ -107,7 +114,7 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
throw new NoSuchElementException("No such element at index");
}
entry = iter.next();
entry = lastIter.next();
currentElementRemoved = false;
return getValue(entry);
}
@ -129,11 +136,12 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
if (lastIter == null) {
throw new IllegalStateException();
}
iter.remove();
firstValues.remove(entry.getKey().getBuf());
lastIter.remove();
removeKey();
currentElementRemoved = true;
removeExecuted = true;

@ -62,7 +62,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
@Override
public Future<Long> addAsync(double longitude, double latitude, V member) {
return commandExecutor.writeAsync(getName(), RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member);
}
private String convert(double longitude) {

@ -18,6 +18,7 @@ package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -33,9 +34,14 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapCacheScanResult;
import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
@ -43,6 +49,7 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.RMapCache;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* <p>Map-based cache with ability to set TTL for each entry via
@ -69,7 +76,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private static final RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE);
@ -168,10 +175,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" +
"local expireIdleHead = redis.call('zrange', KEYS[3], 0, 0, 'withscores');" +
"local maxDate = table.remove(ARGV, 1); " // index is the first parameter
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= tonumber(maxDate); "
+ "local hasExpireIdle = #expireIdleHead == 2 and tonumber(expireIdleHead[2]) <= tonumber(maxDate); "
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; "
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
@ -182,18 +187,18 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if hasExpire then "
+ "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if expireDate ~= false and tonumber(expireDate) <= tonumber(maxDate) then "
+ "if expireDate ~= false and tonumber(expireDate) <= currentTime then "
+ "map[i] = false; "
+ "end; "
+ "end; "
+ "if hasExpireIdle and t ~= 0 then "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "if tonumber(expireIdle) > currentTime then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "redis.call('zadd', KEYS[3], t + currentTime, key); "
+ "else "
+ "map[i] = false; "
+ "end; "
@ -526,9 +531,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_HSCAN,
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[2]); "
+ "local currentTime = tonumber(ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then "
+ "local key = res[2][i-1]; " +
@ -542,22 +551,61 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "if tonumber(expireIdle) > currentTime and expireDate > currentTime then "
+ "table.insert(idleKeys, key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "if expireDate > currentTime then "
+ "table.insert(result, key); "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos);
+ "return {res[1], result, idleKeys};", Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos);
f.addListener(new FutureListener<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>>() {
@Override
public void operationComplete(Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> future)
throws Exception {
if (future.isSuccess()) {
MapCacheScanResult<ScanObjectEntry, ScanObjectEntry> res = future.getNow();
if (res.getIdleKeys().isEmpty()) {
return;
}
List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1);
args.add(System.currentTimeMillis());
args.addAll(res.getIdleKeys());
commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
+ "if value ~= false then "
+ "local key = ARGV[i]; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[2], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > currentTime then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[2], t + currentTime, key); "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "end; ",
Arrays.<Object>asList(getName(), getIdleSetName()), args.toArray());
}
}
});
return get(f);
}
@ -691,4 +739,73 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()));
}
@Override
public Future<Set<java.util.Map.Entry<K, V>>> readAllEntrySetAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_ENTRY,
"local s = redis.call('hgetall', KEYS[1]); "
+ "local result = {}; "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local key = s[i-1];" +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "table.insert(result, key); "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis());
}
@Override
public Future<Collection<V>> readAllValuesAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local s = redis.call('hgetall', KEYS[1]); "
+ "local result = {}; "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local key = s[i-1];" +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis());
}
}

@ -35,6 +35,7 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
@ -90,7 +91,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
makeCheckpoint = false;
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) {
if (cmd.getCommand().getReplayMultiDecoder() != null
&& (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) {
makeCheckpoint = false;
}
}
@ -139,7 +142,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);
decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
Channel channel = ctx.channel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts(), channel);
@ -156,7 +159,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().resetLevel();
decode(in, cmd, null, ctx.channel());
} else {
decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
}
}
}
@ -267,13 +270,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
decodeMulti(in, data, parts, channel, size, respParts);
decodeList(in, data, parts, channel, size, respParts);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}
private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts)
throws IOException {
for (int i = respParts.size(); i < size; i++) {

@ -179,6 +179,7 @@ public interface RedisCommands {
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", ValueType.MAP_VALUE);
RedisCommand<Set<Entry<Object, Object>>> EVAL_MAP_ENTRY = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> EVAL_MAP_VALUE_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
RedisStrictCommand<Long> INCR = new RedisStrictCommand<Long>("INCR");

@ -0,0 +1,115 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ListMultiDecoder<T> implements MultiDecoder<Object> {
private final MultiDecoder<?>[] decoders;
public static class NestedDecoderState implements DecoderState {
int index = -1;
int partsIndex = -1;
public NestedDecoderState() {
}
public NestedDecoderState(int index) {
super();
this.index = index;
}
public void resetPartsIndex() {
partsIndex = -1;
}
public int incPartsIndex() {
return ++partsIndex;
}
public int getPartsIndex() {
return partsIndex;
}
public int incIndex() {
return ++index;
}
public int getIndex() {
return index;
}
@Override
public DecoderState copy() {
return new NestedDecoderState(index);
}
@Override
public String toString() {
return "NestedDecoderState [index=" + index + "]";
}
}
protected final NestedDecoderState getDecoder(State state) {
NestedDecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new NestedDecoderState();
state.setDecoderState(ds);
}
return ds;
}
public ListMultiDecoder(MultiDecoder<?> ... decoders) {
this.decoders = decoders;
}
public Object decode(ByteBuf buf, State state) throws IOException {
int index = getDecoder(state).getIndex();
return decoders[index].decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
if (paramNum == 0) {
NestedDecoderState s = getDecoder(state);
s.incIndex();
s.resetPartsIndex();
}
return true;
}
@Override
public Object decode(List<Object> parts, State state) {
NestedDecoderState s = getDecoder(state);
int index = s.getIndex();
index += s.incPartsIndex();
Object res = decoders[index].decode(parts, state);
if (res == null) {
index = s.incIndex() + s.getPartsIndex();
return decoders[index].decode(parts, state);
}
return res;
}
}

@ -0,0 +1,43 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class LongMultiDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return LongCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public Object decode(List<Object> parts, State state) {
return null;
}
}

@ -0,0 +1,34 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import java.util.Map;
public class MapCacheScanResult<K, V> extends MapScanResult<K, V> {
private final List<K> idleKeys;
public MapCacheScanResult(Long pos, Map<K, V> values, List<K> idleKeys) {
super(pos, values);
this.idleKeys = idleKeys;
};
public List<K> getIdleKeys() {
return idleKeys;
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class MapCacheScanResultReplayDecoder implements MultiDecoder<MapCacheScanResult<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public MapCacheScanResult<Object, Object> decode(List<Object> parts, State state) {
Long pos = (Long)parts.get(0);
Map<Object, Object> values = (Map<Object, Object>)parts.get(1);
List<Object> idleKeys = (List<Object>) parts.get(2);
return new MapCacheScanResult<Object, Object>(pos, values, idleKeys);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,50 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectListDecoder<T> implements MultiDecoder<List<T>> {
private Codec codec;
public ObjectListDecoder(Codec codec) {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return codec.getMapKeyDecoder().decode(buf, state);
}
@Override
public List<T> decode(List<Object> parts, State state) {
return (List<T>) parts;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,63 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectMapDecoder implements MultiDecoder<Map<Object, Object>> {
private Codec codec;
public ObjectMapDecoder(Codec codec) {
super();
this.codec = codec;
}
private int pos;
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
if (pos++ % 2 == 0) {
return codec.getMapKeyDecoder().decode(buf, state);
}
return codec.getMapValueDecoder().decode(buf, state);
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i));
}
}
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -14,8 +14,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.codec.JsonJacksonCodec;
@ -127,13 +125,20 @@ public class RedissonMapCacheTest extends BaseTest {
}
}
@Test
public void testCacheValues() {
final RMapCache<String, String> map = redisson.getMapCache("testRMapCacheValues");
map.put("1234", "5678", 0, TimeUnit.MINUTES, 60, TimeUnit.MINUTES);
assertThat(map.values()).containsOnly("5678");
}
@Test
public void testGetAll() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("getAll");
map.put(1, 100);
map.put(2, 200, 1, TimeUnit.SECONDS);
map.put(3, 300, 1, TimeUnit.SECONDS);
map.put(3, 300, 1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
map.put(4, 400);
Map<Integer, Integer> filtered = map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5)));
@ -222,6 +227,28 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(0, map.size());
}
@Test
public void testIteratorRandomRemoveFirst() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simpleMap");
for (int i = 0; i < 1000; i++) {
map.put(i, i*10);
}
int cnt = 0;
int removed = 0;
Iterator<Entry<Integer, Integer>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, Integer> entry = iterator.next();
if (cnt < 20) {
iterator.remove();
removed++;
}
cnt++;
}
Assert.assertEquals(1000, cnt);
assertThat(map.size()).isEqualTo(cnt - removed);
}
@Test
public void testIteratorRandomRemoveHighVolume() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simpleMap");
@ -265,10 +292,12 @@ public class RedissonMapCacheTest extends BaseTest {
map.put(2, "33", 1, TimeUnit.SECONDS);
map.put(3, "43");
Assert.assertEquals(3, map.entrySet().size());
MatcherAssert.assertThat(map, Matchers.hasEntry(Matchers.equalTo(1), Matchers.equalTo("12")));
MatcherAssert.assertThat(map, Matchers.hasEntry(Matchers.equalTo(3), Matchers.equalTo("43")));
Map<Integer, String> expected = new HashMap<>();
map.put(1, "12");
map.put(3, "43");
assertThat(map.entrySet()).containsAll(expected.entrySet());
assertThat(map).hasSize(3);
}
@Test
@ -297,7 +326,7 @@ public class RedissonMapCacheTest extends BaseTest {
joinMap.put(6, "6");
map.putAll(joinMap);
MatcherAssert.assertThat(map.keySet(), Matchers.containsInAnyOrder(1, 2, 3, 4, 5, 6));
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6);
}
@Test
@ -579,47 +608,6 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(1, map.size());
}
// @Test
// public void testKeyIterator() {
// RMap<Integer, Integer> map = redisson.getCache("simple");
// map.put(1, 0);
// map.put(3, 5);
// map.put(4, 6);
// map.put(7, 8);
//
// Collection<Integer> keys = map.keySet();
// MatcherAssert.assertThat(keys, Matchers.containsInAnyOrder(1, 3, 4, 7));
// for (Iterator<Integer> iterator = map.keyIterator(); iterator.hasNext();) {
// Integer value = iterator.next();
// if (!keys.remove(value)) {
// Assert.fail();
// }
// }
//
// Assert.assertEquals(0, keys.size());
// }
// @Test
// public void testValueIterator() {
// RCache<Integer, Integer> map = redisson.getCache("simple");
// map.put(1, 0);
// map.put(3, 5);
// map.put(4, 6);
// map.put(7, 8);
//
// Collection<Integer> values = map.values();
// MatcherAssert.assertThat(values, Matchers.containsInAnyOrder(0, 5, 6, 8));
// for (Iterator<Integer> iterator = map.valueIterator(); iterator.hasNext();) {
// Integer value = iterator.next();
// if (!values.remove(value)) {
// Assert.fail();
// }
// }
//
// Assert.assertEquals(0, values.size());
// }
@Test
public void testFastPutIfAbsent() throws Exception {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
@ -767,6 +755,7 @@ public class RedissonMapCacheTest extends BaseTest {
}
@Test
public void testRMapCacheValues() {
final RMapCache<String, String> map = redisson.getMapCache("testRMapCacheValues");
@ -775,7 +764,18 @@ public class RedissonMapCacheTest extends BaseTest {
}
@Test
public void testRMapCacheAllValues() {
public void testReadAllEntrySet() throws InterruptedException {
RMapCache<Integer, String> map = redisson.getMapCache("simple12");
map.put(1, "12");
map.put(2, "33", 10, TimeUnit.MINUTES, 60, TimeUnit.MINUTES);
map.put(3, "43");
assertThat(map.readAllEntrySet()).isEqualTo(map.entrySet());
}
@Test
public void testReadAllValues() {
final RMapCache<String, String> map = redisson.getMapCache("testRMapCacheAllValues");
map.put("1234", "5678", 1, TimeUnit.MINUTES, 60, TimeUnit.MINUTES);
assertThat(map.readAllValues()).containsOnly("5678");

@ -11,6 +11,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -401,6 +402,18 @@ public class RedissonMapTest extends BaseTest {
Map<SimpleKey, SimpleValue> testMap = new HashMap<>(map);
assertThat(map.readAllKeySet()).containsOnlyElementsOf(testMap.keySet());
}
@Test
public void testReadAllKeySetHighAmount() {
RMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
for (int i = 0; i < 1000; i++) {
map.put(new SimpleKey("" + i), new SimpleValue("" + i));
}
assertThat(map.readAllKeySet().size()).isEqualTo(1000);
Map<SimpleKey, SimpleValue> testMap = new HashMap<>(map);
assertThat(map.readAllKeySet()).containsOnlyElementsOf(testMap.keySet());
}
@Test
public void testReadAllValues() {

Loading…
Cancel
Save