From 904d58d8e94814c5271ef74dac9e7a1bc925557a Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 19 Apr 2016 11:54:36 +0300 Subject: [PATCH 1/3] RMapCache iterator fixed. readAll methods fixed. #471 --- .../java/org/redisson/RedissonMapCache.java | 153 +++++++++++++++--- .../client/handler/CommandDecoder.java | 13 +- .../client/protocol/RedisCommands.java | 1 + .../protocol/decoder/ListMultiDecoder.java | 115 +++++++++++++ .../protocol/decoder/LongMultiDecoder.java | 43 +++++ .../protocol/decoder/MapCacheScanResult.java | 34 ++++ .../MapCacheScanResultReplayDecoder.java | 46 ++++++ .../protocol/decoder/ObjectListDecoder.java | 50 ++++++ .../protocol/decoder/ObjectMapDecoder.java | 63 ++++++++ .../org/redisson/RedissonMapCacheTest.java | 78 ++++----- 10 files changed, 523 insertions(+), 73 deletions(-) create mode 100644 src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/LongMultiDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResult.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index 0167015c8..0b5e495f6 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -18,6 +18,7 @@ package org.redisson; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,9 +34,14 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.LongMultiDecoder; +import org.redisson.client.protocol.decoder.MapCacheScanResult; +import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.NestedMultiDecoder; +import org.redisson.client.protocol.decoder.ObjectListDecoder; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; @@ -43,6 +49,7 @@ import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.core.RMapCache; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; /** *

Map-based cache with ability to set TTL for each entry via @@ -69,7 +76,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); private static final RedisCommand EVAL_HMSET = new RedisCommand("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP); - private static final RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); + private static final RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP); private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE); @@ -168,10 +175,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE), "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + - "local expireIdleHead = redis.call('zrange', KEYS[3], 0, 0, 'withscores');" + - "local maxDate = table.remove(ARGV, 1); " // index is the first parameter - + "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= tonumber(maxDate); " - + "local hasExpireIdle = #expireIdleHead == 2 and tonumber(expireIdleHead[2]) <= tonumber(maxDate); " + "local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter + + "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; " + "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); " + "for i = #map, 1, -1 do " + "local value = map[i]; " @@ -182,18 +187,18 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if hasExpire then " + "local expireDate = redis.call('zscore', KEYS[2], key); " - + "if expireDate ~= false and tonumber(expireDate) <= tonumber(maxDate) then " + + "if expireDate ~= false and tonumber(expireDate) <= currentTime then " + "map[i] = false; " + "end; " + "end; " - + "if hasExpireIdle and t ~= 0 then " + + "if t ~= 0 then " + "local expireIdle = redis.call('zscore', KEYS[3], key); " + "if expireIdle ~= false then " - + "if tonumber(expireIdle) > tonumber(ARGV[1]) then " + + "if tonumber(expireIdle) > currentTime then " + "local value = struct.pack('dLc0', t, string.len(val), val); " + "redis.call('hset', KEYS[1], key, value); " - + "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); " + + "redis.call('zadd', KEYS[3], t + currentTime, key); " + "else " + "map[i] = false; " + "end; " @@ -526,9 +531,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override MapScanResult scanIterator(InetSocketAddress client, long startPos) { - Future> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_HSCAN, + RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", + new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); + Future> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN, "local result = {}; " + + "local idleKeys = {}; " + "local res = redis.call('hscan', KEYS[1], ARGV[2]); " + + "local currentTime = tonumber(ARGV[1]); " + "for i, value in ipairs(res[2]) do " + "if i % 2 == 0 then " + "local key = res[2][i-1]; " + @@ -542,22 +551,61 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if t ~= 0 then " + "local expireIdle = redis.call('zscore', KEYS[3], key); " + "if expireIdle ~= false then " - + "if tonumber(expireIdle) > tonumber(ARGV[1]) then " - + "local value = struct.pack('dLc0', t, string.len(val), val); " - + "redis.call('hset', KEYS[1], key, value); " - + "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); " + + "if tonumber(expireIdle) > currentTime and expireDate > currentTime then " + + "table.insert(idleKeys, key); " + "end; " + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + "end; " + "end; " - + "if expireDate > tonumber(ARGV[1]) then " + + "if expireDate > currentTime then " + "table.insert(result, key); " + "table.insert(result, val); " + "end; " + "end; " + "end;" - + "return {res[1], result};", Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos); + + "return {res[1], result, idleKeys};", Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos); + + f.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) + throws Exception { + if (future.isSuccess()) { + MapCacheScanResult res = future.getNow(); + if (res.getIdleKeys().isEmpty()) { + return; + } + + List args = new ArrayList(res.getIdleKeys().size() + 1); + args.add(System.currentTimeMillis()); + args.addAll(res.getIdleKeys()); + + commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE), + "local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter + + "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); " + + "for i = #map, 1, -1 do " + + "local value = map[i]; " + + "if value ~= false then " + + "local key = ARGV[i]; " + + "local t, val = struct.unpack('dLc0', value); " + + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[2], key); " + + "if expireIdle ~= false then " + + "if tonumber(expireIdle) > currentTime then " + + "local value = struct.pack('dLc0', t, string.len(val), val); " + + "redis.call('hset', KEYS[1], key, value); " + + "redis.call('zadd', KEYS[2], t + currentTime, key); " + + "end; " + + "end; " + + "end; " + + "end; " + + "end; ", + Arrays.asList(getName(), getIdleSetName()), args.toArray()); + + } + } + }); return get(f); } @@ -691,4 +739,73 @@ public class RedissonMapCache extends RedissonMap implements RMapCac Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName())); } + @Override + public Future>> readAllEntrySetAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_ENTRY, + "local s = redis.call('hgetall', KEYS[1]); " + + "local result = {}; " + + "for i, v in ipairs(s) do " + + "if i % 2 == 0 then " + + "local t, val = struct.unpack('dLc0', v); " + + "local key = s[i-1];" + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], key); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], key); " + + "if expireIdle ~= false then " + + "if tonumber(expireIdle) > tonumber(ARGV[1]) then " + + "local value = struct.pack('dLc0', t, string.len(val), val); " + + "redis.call('hset', KEYS[1], key, value); " + + "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); " + + "end; " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate > tonumber(ARGV[1]) then " + + "table.insert(result, key); " + + "table.insert(result, val); " + + "end; " + + "end; " + + "end;" + + "return result;", + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis()); + } + + @Override + public Future> readAllValuesAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST, + "local s = redis.call('hgetall', KEYS[1]); " + + "local result = {}; " + + "for i, v in ipairs(s) do " + + "if i % 2 == 0 then " + + "local t, val = struct.unpack('dLc0', v); " + + "local key = s[i-1];" + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], key); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], key); " + + "if expireIdle ~= false then " + + "if tonumber(expireIdle) > tonumber(ARGV[1]) then " + + "local value = struct.pack('dLc0', t, string.len(val), val); " + + "redis.call('hset', KEYS[1], key, value); " + + "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); " + + "end; " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate > tonumber(ARGV[1]) then " + + "table.insert(result, val); " + + "end; " + + "end; " + + "end;" + + "return result;", + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis()); + } + } diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 260c21fb7..9e07d9818 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -35,6 +35,7 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.pubsub.Message; @@ -90,7 +91,9 @@ public class CommandDecoder extends ReplayingDecoder { makeCheckpoint = false; } else { CommandData cmd = (CommandData)data; - if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) { + if (cmd.getCommand().getReplayMultiDecoder() != null + && (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) + || ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) { makeCheckpoint = false; } } @@ -139,7 +142,7 @@ public class CommandDecoder extends ReplayingDecoder { StateLevel firstLevel = state().getLevels().get(0); StateLevel secondLevel = state().getLevels().get(1); - decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); + decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); Channel channel = ctx.channel(); MultiDecoder decoder = messageDecoder(cmd, firstLevel.getParts(), channel); @@ -156,7 +159,7 @@ public class CommandDecoder extends ReplayingDecoder { state().resetLevel(); decode(in, cmd, null, ctx.channel()); } else { - decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); + decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); } } } @@ -267,13 +270,13 @@ public class CommandDecoder extends ReplayingDecoder { } } - decodeMulti(in, data, parts, channel, size, respParts); + decodeList(in, data, parts, channel, size, respParts); } else { throw new IllegalStateException("Can't decode replay " + (char)code); } } - private void decodeMulti(ByteBuf in, CommandData data, List parts, + private void decodeList(ByteBuf in, CommandData data, List parts, Channel channel, long size, List respParts) throws IOException { for (int i = respParts.size(); i < size; i++) { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index e3737d259..1ee63f244 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -179,6 +179,7 @@ public interface RedisCommands { RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); RedisCommand EVAL_MAP_VALUE = new RedisCommand("EVAL", ValueType.MAP_VALUE); + RedisCommand>> EVAL_MAP_ENTRY = new RedisCommand>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); RedisCommand> EVAL_MAP_VALUE_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); RedisStrictCommand INCR = new RedisStrictCommand("INCR"); diff --git a/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java new file mode 100644 index 000000000..3b9df4013 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java @@ -0,0 +1,115 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class ListMultiDecoder implements MultiDecoder { + + private final MultiDecoder[] decoders; + + public static class NestedDecoderState implements DecoderState { + + int index = -1; + int partsIndex = -1; + + public NestedDecoderState() { + } + + public NestedDecoderState(int index) { + super(); + this.index = index; + } + + public void resetPartsIndex() { + partsIndex = -1; + } + + public int incPartsIndex() { + return ++partsIndex; + } + + public int getPartsIndex() { + return partsIndex; + } + + public int incIndex() { + return ++index; + } + + public int getIndex() { + return index; + } + + @Override + public DecoderState copy() { + return new NestedDecoderState(index); + } + + @Override + public String toString() { + return "NestedDecoderState [index=" + index + "]"; + } + + } + + protected final NestedDecoderState getDecoder(State state) { + NestedDecoderState ds = state.getDecoderState(); + if (ds == null) { + ds = new NestedDecoderState(); + state.setDecoderState(ds); + } + return ds; + } + + public ListMultiDecoder(MultiDecoder ... decoders) { + this.decoders = decoders; + } + + public Object decode(ByteBuf buf, State state) throws IOException { + int index = getDecoder(state).getIndex(); + return decoders[index].decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + if (paramNum == 0) { + NestedDecoderState s = getDecoder(state); + s.incIndex(); + s.resetPartsIndex(); + } + return true; + } + + @Override + public Object decode(List parts, State state) { + NestedDecoderState s = getDecoder(state); + int index = s.getIndex(); + index += s.incPartsIndex(); + Object res = decoders[index].decode(parts, state); + if (res == null) { + index = s.incIndex() + s.getPartsIndex(); + return decoders[index].decode(parts, state); + } + return res; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/LongMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/LongMultiDecoder.java new file mode 100644 index 000000000..30a2ee66d --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/LongMultiDecoder.java @@ -0,0 +1,43 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class LongMultiDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return LongCodec.INSTANCE.getValueDecoder().decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + + @Override + public Object decode(List parts, State state) { + return null; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResult.java new file mode 100644 index 000000000..0a6e9b2f2 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResult.java @@ -0,0 +1,34 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; +import java.util.Map; + +public class MapCacheScanResult extends MapScanResult { + + private final List idleKeys; + + public MapCacheScanResult(Long pos, Map values, List idleKeys) { + super(pos, values); + this.idleKeys = idleKeys; + }; + + public List getIdleKeys() { + return idleKeys; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java new file mode 100644 index 000000000..afed8cc14 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class MapCacheScanResultReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public MapCacheScanResult decode(List parts, State state) { + Long pos = (Long)parts.get(0); + Map values = (Map)parts.get(1); + List idleKeys = (List) parts.get(2); + return new MapCacheScanResult(pos, values, idleKeys); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java new file mode 100644 index 000000000..9b3ac26ee --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java @@ -0,0 +1,50 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class ObjectListDecoder implements MultiDecoder> { + + private Codec codec; + + public ObjectListDecoder(Codec codec) { + super(); + this.codec = codec; + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return codec.getMapKeyDecoder().decode(buf, state); + } + + @Override + public List decode(List parts, State state) { + return (List) parts; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java new file mode 100644 index 000000000..3b1c1c6bb --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java @@ -0,0 +1,63 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class ObjectMapDecoder implements MultiDecoder> { + + private Codec codec; + + public ObjectMapDecoder(Codec codec) { + super(); + this.codec = codec; + } + + private int pos; + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + if (pos++ % 2 == 0) { + return codec.getMapKeyDecoder().decode(buf, state); + } + return codec.getMapValueDecoder().decode(buf, state); + } + + @Override + public Map decode(List parts, State state) { + Map result = new HashMap(parts.size()/2); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + result.put(parts.get(i-1), parts.get(i)); + } + } + return result; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return true; + } + +} diff --git a/src/test/java/org/redisson/RedissonMapCacheTest.java b/src/test/java/org/redisson/RedissonMapCacheTest.java index e1475d4e7..3edbb2216 100644 --- a/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -14,8 +14,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; import org.redisson.codec.JsonJacksonCodec; @@ -127,13 +125,20 @@ public class RedissonMapCacheTest extends BaseTest { } } + + @Test + public void testCacheValues() { + final RMapCache map = redisson.getMapCache("testRMapCacheValues"); + map.put("1234", "5678", 0, TimeUnit.MINUTES, 60, TimeUnit.MINUTES); + assertThat(map.values()).containsOnly("5678"); + } @Test public void testGetAll() throws InterruptedException { RMapCache map = redisson.getMapCache("getAll"); map.put(1, 100); map.put(2, 200, 1, TimeUnit.SECONDS); - map.put(3, 300, 1, TimeUnit.SECONDS); + map.put(3, 300, 1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS); map.put(4, 400); Map filtered = map.getAll(new HashSet(Arrays.asList(2, 3, 5))); @@ -265,10 +270,12 @@ public class RedissonMapCacheTest extends BaseTest { map.put(2, "33", 1, TimeUnit.SECONDS); map.put(3, "43"); - Assert.assertEquals(3, map.entrySet().size()); - - MatcherAssert.assertThat(map, Matchers.hasEntry(Matchers.equalTo(1), Matchers.equalTo("12"))); - MatcherAssert.assertThat(map, Matchers.hasEntry(Matchers.equalTo(3), Matchers.equalTo("43"))); + Map expected = new HashMap<>(); + map.put(1, "12"); + map.put(3, "43"); + + assertThat(map.entrySet()).containsAll(expected.entrySet()); + assertThat(map).hasSize(3); } @Test @@ -297,7 +304,7 @@ public class RedissonMapCacheTest extends BaseTest { joinMap.put(6, "6"); map.putAll(joinMap); - MatcherAssert.assertThat(map.keySet(), Matchers.containsInAnyOrder(1, 2, 3, 4, 5, 6)); + assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6); } @Test @@ -579,47 +586,6 @@ public class RedissonMapCacheTest extends BaseTest { Assert.assertEquals(1, map.size()); } -// @Test -// public void testKeyIterator() { -// RMap map = redisson.getCache("simple"); -// map.put(1, 0); -// map.put(3, 5); -// map.put(4, 6); -// map.put(7, 8); -// -// Collection keys = map.keySet(); -// MatcherAssert.assertThat(keys, Matchers.containsInAnyOrder(1, 3, 4, 7)); -// for (Iterator iterator = map.keyIterator(); iterator.hasNext();) { -// Integer value = iterator.next(); -// if (!keys.remove(value)) { -// Assert.fail(); -// } -// } -// -// Assert.assertEquals(0, keys.size()); -// } - -// @Test -// public void testValueIterator() { -// RCache map = redisson.getCache("simple"); -// map.put(1, 0); -// map.put(3, 5); -// map.put(4, 6); -// map.put(7, 8); -// -// Collection values = map.values(); -// MatcherAssert.assertThat(values, Matchers.containsInAnyOrder(0, 5, 6, 8)); -// for (Iterator iterator = map.valueIterator(); iterator.hasNext();) { -// Integer value = iterator.next(); -// if (!values.remove(value)) { -// Assert.fail(); -// } -// } -// -// Assert.assertEquals(0, values.size()); -// } - - @Test public void testFastPutIfAbsent() throws Exception { RMapCache map = redisson.getMapCache("simple"); @@ -767,6 +733,7 @@ public class RedissonMapCacheTest extends BaseTest { } + @Test public void testRMapCacheValues() { final RMapCache map = redisson.getMapCache("testRMapCacheValues"); @@ -775,7 +742,18 @@ public class RedissonMapCacheTest extends BaseTest { } @Test - public void testRMapCacheAllValues() { + public void testReadAllEntrySet() throws InterruptedException { + RMapCache map = redisson.getMapCache("simple12"); + map.put(1, "12"); + map.put(2, "33", 10, TimeUnit.MINUTES, 60, TimeUnit.MINUTES); + map.put(3, "43"); + + assertThat(map.readAllEntrySet()).isEqualTo(map.entrySet()); + } + + + @Test + public void testReadAllValues() { final RMapCache map = redisson.getMapCache("testRMapCacheAllValues"); map.put("1234", "5678", 1, TimeUnit.MINUTES, 60, TimeUnit.MINUTES); assertThat(map.readAllValues()).containsOnly("5678"); From b4bf2c66b29ebdd7263c1a8a7b68cf59fbda442f Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 19 Apr 2016 12:14:38 +0300 Subject: [PATCH 2/3] RGeo.addAsync codec definition fixed --- src/main/java/org/redisson/RedissonGeo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/redisson/RedissonGeo.java b/src/main/java/org/redisson/RedissonGeo.java index 527892d25..20ca9a3d1 100644 --- a/src/main/java/org/redisson/RedissonGeo.java +++ b/src/main/java/org/redisson/RedissonGeo.java @@ -62,7 +62,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { @Override public Future addAsync(double longitude, double latitude, V member) { - return commandExecutor.writeAsync(getName(), RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member); } private String convert(double longitude) { From fca186befb1946beae3d9a2d326863453b2ab91c Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 19 Apr 2016 16:38:17 +0300 Subject: [PATCH 3/3] Fixed bug with items removing during RedissonMap iteration --- .../org/redisson/RedissonBaseMapIterator.java | 36 +++++++++++-------- .../org/redisson/RedissonMapCacheTest.java | 22 ++++++++++++ .../java/org/redisson/RedissonMapTest.java | 13 +++++++ 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBaseMapIterator.java b/src/main/java/org/redisson/RedissonBaseMapIterator.java index d493efb6d..140cf747a 100644 --- a/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -32,7 +31,8 @@ import io.netty.buffer.ByteBuf; abstract class RedissonBaseMapIterator implements Iterator { private Map firstValues; - private Iterator> iter; + private Map lastValues; + private Iterator> lastIter; protected long nextIterPos; protected long startPos = -1; protected InetSocketAddress client; @@ -48,7 +48,7 @@ abstract class RedissonBaseMapIterator implements Iterator { return false; } - if (iter == null || !iter.hasNext()) { + if (lastIter == null || !lastIter.hasNext()) { if (nextIterPos == -1) { return false; } @@ -56,31 +56,38 @@ abstract class RedissonBaseMapIterator implements Iterator { do { prevIterPos = nextIterPos; MapScanResult res = iterator(); + if (lastValues != null) { + free(lastValues); + } + lastValues = convert(res.getMap()); client = res.getRedisClient(); if (startPos == -1) { startPos = res.getPos(); } if (nextIterPos == 0 && firstValues == null) { - firstValues = convert(res.getMap()); + firstValues = lastValues; + lastValues = null; } else { - Map newValues = convert(res.getMap()); - if (firstValues.entrySet().containsAll(newValues.entrySet())) { + if (firstValues.isEmpty()) { + firstValues = lastValues; + lastValues = null; + } else if (lastValues.keySet().removeAll(firstValues.keySet())) { finished = true; free(firstValues); - free(newValues); + free(lastValues); firstValues = null; + lastValues = null; return false; } - free(newValues); } - iter = res.getMap().entrySet().iterator(); + lastIter = res.getMap().entrySet().iterator(); nextIterPos = res.getPos(); - } while (!iter.hasNext() && nextIterPos != prevIterPos); + } while (!lastIter.hasNext() && nextIterPos != prevIterPos); if (prevIterPos == nextIterPos && !removeExecuted) { nextIterPos = -1; } } - return iter.hasNext(); + return lastIter.hasNext(); } @@ -107,7 +114,7 @@ abstract class RedissonBaseMapIterator implements Iterator { throw new NoSuchElementException("No such element at index"); } - entry = iter.next(); + entry = lastIter.next(); currentElementRemoved = false; return getValue(entry); } @@ -129,11 +136,12 @@ abstract class RedissonBaseMapIterator implements Iterator { if (currentElementRemoved) { throw new IllegalStateException("Element been already deleted"); } - if (iter == null) { + if (lastIter == null) { throw new IllegalStateException(); } - iter.remove(); + firstValues.remove(entry.getKey().getBuf()); + lastIter.remove(); removeKey(); currentElementRemoved = true; removeExecuted = true; diff --git a/src/test/java/org/redisson/RedissonMapCacheTest.java b/src/test/java/org/redisson/RedissonMapCacheTest.java index 3edbb2216..c98039f4f 100644 --- a/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -227,6 +227,28 @@ public class RedissonMapCacheTest extends BaseTest { Assert.assertEquals(0, map.size()); } + @Test + public void testIteratorRandomRemoveFirst() throws InterruptedException { + RMapCache map = redisson.getMapCache("simpleMap"); + for (int i = 0; i < 1000; i++) { + map.put(i, i*10); + } + + int cnt = 0; + int removed = 0; + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + if (cnt < 20) { + iterator.remove(); + removed++; + } + cnt++; + } + Assert.assertEquals(1000, cnt); + assertThat(map.size()).isEqualTo(cnt - removed); + } + @Test public void testIteratorRandomRemoveHighVolume() throws InterruptedException { RMapCache map = redisson.getMapCache("simpleMap"); diff --git a/src/test/java/org/redisson/RedissonMapTest.java b/src/test/java/org/redisson/RedissonMapTest.java index 0a46a6558..8ae9a0196 100644 --- a/src/test/java/org/redisson/RedissonMapTest.java +++ b/src/test/java/org/redisson/RedissonMapTest.java @@ -11,6 +11,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -408,6 +409,18 @@ public class RedissonMapTest extends BaseTest { Map testMap = new HashMap<>(map); assertThat(map.readAllKeySet()).containsOnlyElementsOf(testMap.keySet()); } + + @Test + public void testReadAllKeySetHighAmount() { + RMap map = redisson.getMap("simple"); + for (int i = 0; i < 1000; i++) { + map.put(new SimpleKey("" + i), new SimpleValue("" + i)); + } + + assertThat(map.readAllKeySet().size()).isEqualTo(1000); + Map testMap = new HashMap<>(map); + assertThat(map.readAllKeySet()).containsOnlyElementsOf(testMap.keySet()); + } @Test public void testReadAllValues() {