From dffda7e41171904c7090849dfc783538603f3c60 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 12 Jan 2017 14:25:30 +0300 Subject: [PATCH] Fixed - RedissonBaseIterator.hasNext() doesn't return false on Object based RedissonSet. #718 --- .../org/redisson/RedissonBaseIterator.java | 43 +++++-- .../main/java/org/redisson/RedissonKeys.java | 10 +- .../main/java/org/redisson/RedissonMap.java | 4 +- .../java/org/redisson/RedissonMapCache.java | 4 +- .../java/org/redisson/RedissonMultimap.java | 4 +- .../org/redisson/RedissonScoredSortedSet.java | 8 +- .../main/java/org/redisson/RedissonSet.java | 8 +- .../java/org/redisson/RedissonSetCache.java | 12 +- .../redisson/RedissonSetMultimapValues.java | 8 +- .../redisson/client/codec/MapScanCodec.java | 100 ++++++++++++++++ .../org/redisson/client/codec/ScanCodec.java | 44 ++----- .../redisson/command/CommandAsyncService.java | 41 +++++-- .../main/java/org/redisson/jcache/JCache.java | 4 +- .../reactive/RedissonMapCacheReactive.java | 4 +- .../reactive/RedissonMapReactive.java | 4 +- .../RedissonScoredSortedSetReactive.java | 8 +- .../reactive/RedissonSetCacheReactive.java | 5 +- .../reactive/RedissonSetReactive.java | 8 +- .../reactive/SetReactiveIterator.java | 111 +++++++++++++----- .../RedissonScoredSortedSetReactiveTest.java | 6 +- .../RedissonSetCacheReactiveTest.java | 11 +- 21 files changed, 326 insertions(+), 121 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index 0f85a46d9..fc9151f8d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -22,12 +22,15 @@ import java.util.List; import java.util.NoSuchElementException; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; abstract class RedissonBaseIterator implements Iterator { - private List firstValues; - private List lastValues; - private Iterator lastIter; + private List firstValues; + private List lastValues; + private Iterator lastIter; protected long nextIterPos; protected InetSocketAddress client; @@ -40,6 +43,8 @@ abstract class RedissonBaseIterator implements Iterator { public boolean hasNext() { if (lastIter == null || !lastIter.hasNext()) { if (finished) { + free(firstValues); + free(lastValues); currentElementRemoved = false; removeExecuted = false; @@ -56,8 +61,12 @@ abstract class RedissonBaseIterator implements Iterator { long prevIterPos; do { prevIterPos = nextIterPos; - ListScanResult res = iterator(client, nextIterPos); - lastValues = new ArrayList(res.getValues()); + ListScanResult res = iterator(client, nextIterPos); + if (lastValues != null) { + free(lastValues); + } + + lastValues = convert(res.getValues()); client = res.getRedisClient(); if (nextIterPos == 0 && firstValues == null) { @@ -87,6 +96,9 @@ abstract class RedissonBaseIterator implements Iterator { } } } else if (lastValues.removeAll(firstValues)) { + free(firstValues); + free(lastValues); + currentElementRemoved = false; removeExecuted = false; client = null; @@ -111,11 +123,28 @@ abstract class RedissonBaseIterator implements Iterator { return lastIter.hasNext(); } + private List convert(List list) { + List result = new ArrayList(list.size()); + for (ScanObjectEntry entry : list) { + result.add(entry.getBuf()); + } + return result; + } + + private void free(List list) { + if (list == null) { + return; + } + for (ByteBuf byteBuf : list) { + byteBuf.release(); + } + } + protected boolean tryAgain() { return false; } - abstract ListScanResult iterator(InetSocketAddress client, long nextIterPos); + abstract ListScanResult iterator(InetSocketAddress client, long nextIterPos); @Override public V next() { @@ -123,7 +152,7 @@ abstract class RedissonBaseIterator implements Iterator { throw new NoSuchElementException("No such element"); } - value = lastIter.next(); + value = (V) lastIter.next().getObj(); currentElementRemoved = false; return value; } diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 84e994b7e..449cbdec4 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -33,9 +33,11 @@ import org.redisson.api.RFuture; import org.redisson.api.RKeys; import org.redisson.api.RType; import org.redisson.client.RedisException; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; @@ -104,12 +106,12 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null); } - private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { + private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); } - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); return commandExecutor.get(f); } @@ -117,7 +119,7 @@ public class RedissonKeys implements RKeys { return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 424edc9bd..b8dc79afc 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -34,7 +34,7 @@ import org.redisson.api.RLock; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -454,7 +454,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f - = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos); + = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 991b1aa50..e260ee7f1 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -30,7 +30,7 @@ import org.redisson.api.RMapCache; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -531,7 +531,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); + new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); RFuture> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN, "local result = {}; " + "local idleKeys = {}; " diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index 6a8bde1ca..7bd673874 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -35,7 +35,7 @@ import org.redisson.api.RMultimap; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -271,7 +271,7 @@ public abstract class RedissonMultimap extends RedissonExpirable implement MapScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); + RFuture> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 771483f56..a4d7cb95f 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -34,6 +34,7 @@ import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScoredCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -41,6 +42,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -259,8 +261,8 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), o); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); return get(f); } @@ -269,7 +271,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 3d88107e9..524cae608 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -28,11 +28,13 @@ import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -81,8 +83,8 @@ public class RedissonSet extends RedissonExpirable implements RSet { return getName(); } - ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos); + ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); return get(f); } @@ -91,7 +93,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 408853ce6..451edd4ef 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RSetCache; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -102,13 +104,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< Arrays.asList(getName()), System.currentTimeMillis(), o); } - ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = scanIteratorAsync(client, startPos); + ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = scanIteratorAsync(client, startPos); return get(f); } - public RFuture> scanIteratorAsync(InetSocketAddress client, long startPos) { - return commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_ZSCAN, + public RFuture> scanIteratorAsync(InetSocketAddress client, long startPos) { + return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN, "local result = {}; " + "local res = redis.call('zscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " @@ -127,7 +129,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 049f0d292..239a7d771 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -29,6 +29,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -39,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -161,8 +163,8 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN, + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_SSCAN, "local expireDate = 92233720368547758; " + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "if expireDateScore ~= false then " @@ -182,7 +184,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java new file mode 100644 index 000000000..8ee35fd74 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import java.io.IOException; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapScanCodec implements Codec { + + private final Codec delegate; + private final Codec mapValueCodec; + + public MapScanCodec(Codec delegate) { + this(delegate, null); + } + + public MapScanCodec(Codec delegate, Codec mapValueCodec) { + this.delegate = delegate; + this.mapValueCodec = mapValueCodec; + } + + @Override + public Decoder getValueDecoder() { + return delegate.getValueDecoder(); + } + + @Override + public Encoder getValueEncoder() { + return delegate.getValueEncoder(); + } + + @Override + public Decoder getMapValueDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Codec c = delegate; + if (mapValueCodec != null) { + c = mapValueCodec; + } + Object val = c.getMapValueDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; + } + + @Override + public Encoder getMapValueEncoder() { + Codec c = delegate; + if (mapValueCodec != null) { + c = mapValueCodec; + } + + return c.getMapValueEncoder(); + } + + @Override + public Decoder getMapKeyDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Object val = delegate.getMapKeyDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; + } + + @Override + public Encoder getMapKeyEncoder() { + return delegate.getMapKeyEncoder(); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java index 407eb13ce..d4d0c6335 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java @@ -33,20 +33,21 @@ import io.netty.buffer.Unpooled; public class ScanCodec implements Codec { private final Codec delegate; - private final Codec mapValueCodec; public ScanCodec(Codec delegate) { - this(delegate, null); - } - - public ScanCodec(Codec delegate, Codec mapValueCodec) { this.delegate = delegate; - this.mapValueCodec = mapValueCodec; } @Override public Decoder getValueDecoder() { - return delegate.getValueDecoder(); + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Object val = delegate.getValueDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; } @Override @@ -56,40 +57,17 @@ public class ScanCodec implements Codec { @Override public Decoder getMapValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - Object val = c.getMapValueDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); - } - }; + return delegate.getMapValueDecoder(); } @Override public Encoder getMapValueEncoder() { - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - - return c.getMapValueEncoder(); + return delegate.getMapValueEncoder(); } @Override public Decoder getMapKeyDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); - Object val = delegate.getMapKeyDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); - } - }; + return delegate.getMapKeyDecoder(); } @Override diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 5b3a2721c..677898bc2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -804,22 +804,47 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void handleReference(RPromise mainPromise, R res) { - if (res instanceof List || res instanceof ListScanResult) { - List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res; + if (res instanceof List) { + List r = (List)res; for (int i = 0; i < r.size(); i++) { if (r.get(i) instanceof RedissonReference) { try { - r.set(i ,(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)))); + r.set(i, redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i))); } catch (Exception exception) {//skip and carry on to next one. } } else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) { try { - ScoredEntry se = ((ScoredEntry) r.get(i)); - r.set(i ,new ScoredEntry(se.getScore(), redisson != null + ScoredEntry se = ((ScoredEntry) r.get(i)); + se = new ScoredEntry(se.getScore(), redisson != null ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue()))); + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); + r.set(i, se); + } catch (Exception exception) {//skip and carry on to next one. + } + } + } + mainPromise.trySuccess(res); + } else if (res instanceof ListScanResult) { + List r = ((ListScanResult)res).getValues(); + for (int i = 0; i < r.size(); i++) { + ScanObjectEntry e = r.get(i); + if (e.getObj() instanceof RedissonReference) { + try { + r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getObj()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getObj()))); + } catch (Exception exception) {//skip and carry on to next one. + } + } else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry) e.getObj()).getValue() instanceof RedissonReference) { + try { + ScoredEntry se = ((ScoredEntry) e.getObj()); + se = new ScoredEntry(se.getScore(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); + + r.set(i, new ScanObjectEntry(e.getBuf(), se)); } catch (Exception exception) {//skip and carry on to next one. } } diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 67490046b..4c9c0c21a 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -58,7 +58,7 @@ import org.redisson.api.RLock; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -2089,7 +2089,7 @@ public class JCache extends RedissonObject implements Cache { MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f - = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos); + = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index dc7599072..d4b99f93f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -30,7 +30,7 @@ import org.redisson.EvictionScheduler; import org.redisson.api.RMapCacheReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -342,7 +342,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im @Override Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.evalReadReactive(client, getName(), new ScanCodec(codec), EVAL_HSCAN, + return commandExecutor.evalReadReactive(client, getName(), new MapScanCodec(codec), EVAL_HSCAN, "local result = {}; " + "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index d67bce646..523a3c990 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -24,7 +24,7 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonMap; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -130,7 +130,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme } Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); + return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 927b4056d..7f87a584d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -23,6 +23,7 @@ import java.util.Collections; import org.reactivestreams.Publisher; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; public class RedissonScoredSortedSetReactive extends RedissonExpirableReactive implements RScoredSortedSetReactive { @@ -122,15 +124,15 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv return commandExecutor.readReactive(getName(), codec, RedisCommands.ZRANK, getName(), o); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); } @Override public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index cac153850..be8fd738b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -30,6 +30,7 @@ import org.redisson.api.RSetCacheReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; /** @@ -76,7 +77,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple return reactive(instance.containsAsync(o)); } - Publisher> scanIterator(InetSocketAddress client, long startPos) { + Publisher> scanIterator(InetSocketAddress client, long startPos) { return reactive(instance.scanIteratorAsync(client, startPos)); } @@ -84,7 +85,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 0a9ea284d..fa5db8e9f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -26,8 +26,10 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; /** @@ -66,8 +68,8 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements return reactive(instance.containsAsync(o)); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); } @Override @@ -156,7 +158,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index 787866822..95adcd488 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -16,13 +16,16 @@ package org.redisson.reactive; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; +import io.netty.buffer.ByteBuf; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -32,28 +35,27 @@ public abstract class SetReactiveIterator extends Stream { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private List firstValues; + private List firstValues; + private List lastValues; private long nextIterPos; private InetSocketAddress client; - private long currentIndex; + private boolean finished; @Override protected void onRequest(long n) { - currentIndex = n; - nextValues(); } - private void handle(List vals) { - for (V val : vals) { - onNext(val); + private void handle(List vals) { + for (ScanObjectEntry val : vals) { + onNext((V)val.getObj()); } } protected void nextValues() { final ReactiveSubscription m = this; - scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { @@ -61,32 +63,68 @@ public abstract class SetReactiveIterator extends Stream { } @Override - public void onNext(ListScanResult res) { - client = res.getRedisClient(); - - long prevIterPos = nextIterPos; - if (nextIterPos == 0 && firstValues == null) { - firstValues = res.getValues(); - } else if (res.getValues().equals(firstValues)) { - m.onComplete(); - currentIndex = 0; + public void onNext(ListScanResult res) { + if (finished) { + free(firstValues); + free(lastValues); + + client = null; + firstValues = null; + lastValues = null; + nextIterPos = 0; return; } - nextIterPos = res.getPos(); - if (prevIterPos == nextIterPos) { - nextIterPos = -1; + long prevIterPos = nextIterPos; + if (lastValues != null) { + free(lastValues); + } + + lastValues = convert(res.getValues()); + client = res.getRedisClient(); + + if (nextIterPos == 0 && firstValues == null) { + firstValues = lastValues; + lastValues = null; + if (firstValues.isEmpty()) { + client = null; + firstValues = null; + nextIterPos = 0; + prevIterPos = -1; + } + } else { + if (firstValues.isEmpty()) { + firstValues = lastValues; + lastValues = null; + if (firstValues.isEmpty()) { + if (res.getPos() == 0) { + finished = true; + m.onComplete(); + return; + } + } + } else if (lastValues.removeAll(firstValues)) { + free(firstValues); + free(lastValues); + + client = null; + firstValues = null; + lastValues = null; + nextIterPos = 0; + prevIterPos = -1; + finished = true; + m.onComplete(); + return; + } } handle(res.getValues()); - if (currentIndex == 0) { - return; - } - - if (nextIterPos == -1) { + nextIterPos = res.getPos(); + + if (prevIterPos == nextIterPos) { + finished = true; m.onComplete(); - currentIndex = 0; } } @@ -97,7 +135,7 @@ public abstract class SetReactiveIterator extends Stream { @Override public void onComplete() { - if (currentIndex == 0) { + if (finished) { return; } nextValues(); @@ -106,7 +144,24 @@ public abstract class SetReactiveIterator extends Stream { } }); } + + private void free(List list) { + if (list == null) { + return; + } + for (ByteBuf byteBuf : list) { + byteBuf.release(); + } + } + + private List convert(List list) { + List result = new ArrayList(list.size()); + for (ScanObjectEntry entry : list) { + result.add(entry.getBuf()); + } + return result; + } - protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); + protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java index cc9c6605a..e295a9306 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java @@ -86,9 +86,9 @@ public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest { @Test public void testRemoveAsync() throws InterruptedException, ExecutionException { RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); - set.add(0.11, 1); - set.add(0.22, 3); - set.add(0.33, 7); + sync(set.add(0.11, 1)); + sync(set.add(0.22, 3)); + sync(set.add(0.33, 7)); Assert.assertTrue(sync(set.remove(1))); Assert.assertFalse(sync(set.contains(1))); diff --git a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java index 37b3372f4..6fd89186f 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java @@ -75,10 +75,10 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest { assertThat(sync(set.add("123", 1, TimeUnit.SECONDS))).isFalse(); - Thread.sleep(50); + Thread.sleep(800); assertThat(sync(set.contains("123"))).isTrue(); - Thread.sleep(150); + Thread.sleep(250); assertThat(sync(set.contains("123"))).isFalse(); } @@ -104,12 +104,15 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest { } @Test - public void testIteratorSequence() { + public void testIteratorSequence() throws InterruptedException { RSetCacheReactive set = redisson.getSetCache("set"); for (int i = 0; i < 1000; i++) { - sync(set.add(Long.valueOf(i))); + set.add(Long.valueOf(i)); } + Thread.sleep(1000); + assertThat(sync(set.size())).isEqualTo(1000); + Set setCopy = new HashSet(); for (int i = 0; i < 1000; i++) { setCopy.add(Long.valueOf(i));