From 084d368cfacf4053a379a7df8da48c7d73813f09 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 6 Jan 2016 14:57:32 +0300 Subject: [PATCH] RMap iterator state comparison fixed. #357 --- src/main/java/org/redisson/RedissonMap.java | 16 ++-- .../java/org/redisson/RedissonMapCache.java | 11 ++- .../org/redisson/RedissonMapIterator.java | 41 +++++++--- .../org/redisson/client/codec/ScanCodec.java | 78 +++++++++++++++++++ .../protocol/decoder/ScanObjectEntry.java | 38 +++++++++ 5 files changed, 165 insertions(+), 19 deletions(-) create mode 100644 src/main/java/org/redisson/client/codec/ScanCodec.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index e0a9f42be..1877b313f 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -30,7 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -39,11 +41,13 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.core.Predicate; import org.redisson.core.RMap; +import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.Future; /** @@ -315,8 +319,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - MapScanResult scanIterator(InetSocketAddress client, long startPos) { - Future> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); + MapScanResult scanIterator(InetSocketAddress client, long startPos) { + Future> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); return get(f); } @@ -329,8 +333,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { public Iterator valueIterator() { return new RedissonMapIterator(this) { @Override - V getValue(java.util.Map.Entry entry) { - return entry.getValue(); + V getValue(java.util.Map.Entry entry) { + return (V) entry.getValue().getObj(); } }; } @@ -339,8 +343,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { public Iterator keyIterator() { return new RedissonMapIterator(this) { @Override - K getValue(java.util.Map.Entry entry) { - return entry.getKey(); + K getValue(java.util.Map.Entry entry) { + return (K) entry.getKey().getObj(); } }; } diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index f9f74523e..0aa4b5f46 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -37,6 +37,7 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.CacheGetAllDecoder; @@ -237,7 +238,10 @@ public class RedissonMapCache extends RedissonMap implements RMapCac Future> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL, "local value = redis.call('hget', KEYS[1], ARGV[1]); " + "local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); " - + "if expireDate == false then " + + "local s = string.gmatch(value, '[^:]+'); " + + "print(s()) " + +// "redis.call('zscore', KEYS[2], ARGV[1]); " + "if expireDate == false then " + "expireDate = 92233720368547758; " + "end; " + "return {expireDate, value}; ", @@ -322,6 +326,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL, "local v = redis.call('hget', KEYS[1], ARGV[2]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "redis.call('zadd', 'test_map', ARGV[1], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "return v", Arrays.asList(getName(), getTimeoutSetName()), timeoutDate, key, value); @@ -355,8 +360,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - MapScanResult scanIterator(InetSocketAddress client, long startPos) { - Future> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN, + MapScanResult scanIterator(InetSocketAddress client, long startPos) { + Future> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN, "local result = {}; " + "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " diff --git a/src/main/java/org/redisson/RedissonMapIterator.java b/src/main/java/org/redisson/RedissonMapIterator.java index 72e3e5b12..7ed002fb1 100644 --- a/src/main/java/org/redisson/RedissonMapIterator.java +++ b/src/main/java/org/redisson/RedissonMapIterator.java @@ -16,22 +16,27 @@ package org.redisson; import java.net.InetSocketAddress; +import java.util.AbstractMap; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; public class RedissonMapIterator implements Iterator { - private Map firstValues; - private Iterator> iter; + private Map firstValues; + private Iterator> iter; private long iterPos = 0; private InetSocketAddress client; private boolean removeExecuted; - private Map.Entry entry; + private Map.Entry entry; private final RedissonMap map; @@ -42,19 +47,27 @@ public class RedissonMapIterator implements Iterator { @Override public boolean hasNext() { if (iter == null || !iter.hasNext()) { - MapScanResult res = map.scanIterator(client, iterPos); + MapScanResult res = map.scanIterator(client, iterPos); client = res.getRedisClient(); if (iterPos == 0 && firstValues == null) { - firstValues = (Map) res.getMap(); - } else if (res.getMap().equals(firstValues)) { + firstValues = convert(res.getMap()); + } else if (convert(res.getMap()).equals(firstValues)) { return false; } - iter = ((Map)res.getMap()).entrySet().iterator(); + iter = res.getMap().entrySet().iterator(); iterPos = res.getPos(); } return iter.hasNext(); } + private Map convert(Map map) { + Map result = new HashMap(map.size()); + for (Entry entry : map.entrySet()) { + result.put(entry.getKey().getBuf(), entry.getValue().getBuf()); + } + return result; + } + @Override public M next() { if (!hasNext()) { @@ -66,8 +79,16 @@ public class RedissonMapIterator implements Iterator { return getValue(entry); } - M getValue(Entry entry) { - return (M) entry; + @SuppressWarnings("unchecked") + M getValue(final Entry entry) { + return (M)new AbstractMap.SimpleEntry((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) { + + @Override + public V setValue(V value) { + return map.put((K) entry.getKey().getObj(), value); + } + + }; } @Override @@ -79,7 +100,7 @@ public class RedissonMapIterator implements Iterator { // lazy init iterator hasNext(); iter.remove(); - map.fastRemove(entry.getKey()); + map.fastRemove((K)entry.getKey().getObj()); removeExecuted = true; } diff --git a/src/main/java/org/redisson/client/codec/ScanCodec.java b/src/main/java/org/redisson/client/codec/ScanCodec.java new file mode 100644 index 000000000..4f2259f1e --- /dev/null +++ b/src/main/java/org/redisson/client/codec/ScanCodec.java @@ -0,0 +1,78 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import java.io.IOException; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; + +public class ScanCodec implements Codec { + + public Codec delegate; + + public ScanCodec(Codec delegate) { + super(); + this.delegate = delegate; + } + + @Override + public Decoder getValueDecoder() { + return delegate.getValueDecoder(); + } + + @Override + public Encoder getValueEncoder() { + return delegate.getValueEncoder(); + } + + @Override + public Decoder getMapValueDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + Object val = delegate.getMapValueDecoder().decode(buf, state); + return new ScanObjectEntry(buf, val); + } + }; + } + + @Override + public Encoder getMapValueEncoder() { + return delegate.getMapValueEncoder(); + } + + @Override + public Decoder getMapKeyDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + Object val = delegate.getMapKeyDecoder().decode(buf, state); + return new ScanObjectEntry(buf, val); + } + }; + } + + @Override + public Encoder getMapKeyEncoder() { + return delegate.getMapKeyEncoder(); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java b/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java new file mode 100644 index 000000000..1c7ed8bbe --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java @@ -0,0 +1,38 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import io.netty.buffer.ByteBuf; + +public class ScanObjectEntry { + + private final ByteBuf buf; + private final Object obj; + + public ScanObjectEntry(ByteBuf buf, Object obj) { + this.buf = buf; + this.obj = obj; + } + + public ByteBuf getBuf() { + return buf; + } + + public Object getObj() { + return obj; + } + +}