Fixed - RedissonBaseIterator.hasNext() doesn't return false on Object based RedissonSet. #718

pull/762/head
Nikita 8 years ago
parent 5cb8ab442c
commit dffda7e411

@ -22,12 +22,15 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<V> firstValues;
private List<V> lastValues;
private Iterator<V> lastIter;
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos;
protected InetSocketAddress client;
@ -40,6 +43,8 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
@ -56,8 +61,12 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
long prevIterPos;
do {
prevIterPos = nextIterPos;
ListScanResult<V> res = iterator(client, nextIterPos);
lastValues = new ArrayList<V>(res.getValues());
ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos);
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
@ -87,6 +96,9 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
}
}
} else if (lastValues.removeAll(firstValues)) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
client = null;
@ -111,11 +123,28 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
return lastIter.hasNext();
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
}
return result;
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
}
}
protected boolean tryAgain() {
return false;
}
abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos);
abstract ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos);
@Override
public V next() {
@ -123,7 +152,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
throw new NoSuchElementException("No such element");
}
value = lastIter.next();
value = (V) lastIter.next().getObj();
currentElementRemoved = false;
return value;
}

@ -33,9 +33,11 @@ import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RType;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
@ -104,12 +106,12 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null);
}
private ListScanResult<String> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
}
RFuture<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
}
@ -117,7 +119,7 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() {
@Override
ListScanResult<String> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}

@ -34,7 +34,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -454,7 +454,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos);
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
return get(f);
}

@ -30,7 +30,7 @@ import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -531,7 +531,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "

@ -35,7 +35,7 @@ import org.redisson.api.RMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -271,7 +271,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}

@ -34,6 +34,7 @@ import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -41,6 +42,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -259,8 +261,8 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
return get(f);
}
@ -269,7 +271,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -28,11 +28,13 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -81,8 +83,8 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return getName();
}
ListScanResult<V> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos);
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f);
}
@ -91,7 +93,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos);
}

@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCache;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -102,13 +104,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
Arrays.<Object>asList(getName()), System.currentTimeMillis(), o);
}
ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = scanIteratorAsync(client, startPos);
ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(client, startPos);
return get(f);
}
public RFuture<ListScanResult<V>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_ZSCAN,
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res = redis.call('zscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
@ -127,7 +129,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -29,6 +29,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -39,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -161,8 +163,8 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
@ -182,7 +184,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -0,0 +1,100 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
*
* @author Nikita Koksharov
*
*/
public class MapScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public MapScanCodec(Codec delegate) {
this(delegate, null);
}
public MapScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
}

@ -33,20 +33,21 @@ import io.netty.buffer.Unpooled;
public class ScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public ScanCodec(Codec delegate) {
this(delegate, null);
}
public ScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
@ -56,40 +57,17 @@ public class ScanCodec implements Codec {
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
return delegate.getMapValueDecoder();
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
return delegate.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
return delegate.getMapKeyDecoder();
}
@Override

@ -804,22 +804,47 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
if (res instanceof List || res instanceof ListScanResult) {
List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res;
if (res instanceof List) {
List<Object> r = (List<Object>)res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i ,(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) r.get(i))));
r.set(i, redisson != null
? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)));
} catch (Exception exception) {//skip and carry on to next one.
}
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
try {
ScoredEntry se = ((ScoredEntry) r.get(i));
r.set(i ,new ScoredEntry(se.getScore(), redisson != null
ScoredEntry<?> se = ((ScoredEntry<?>) r.get(i));
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue())));
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, se);
} catch (Exception exception) {//skip and carry on to next one.
}
}
}
mainPromise.trySuccess(res);
} else if (res instanceof ListScanResult) {
List<ScanObjectEntry> r = ((ListScanResult)res).getValues();
for (int i = 0; i < r.size(); i++) {
ScanObjectEntry e = r.get(i);
if (e.getObj() instanceof RedissonReference) {
try {
r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
} else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry<?>) e.getObj()).getValue() instanceof RedissonReference) {
try {
ScoredEntry<?> se = ((ScoredEntry<?>) e.getObj());
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, new ScanObjectEntry(e.getBuf(), se));
} catch (Exception exception) {//skip and carry on to next one.
}
}

@ -58,7 +58,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -2089,7 +2089,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos);
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
return get(f);
}

@ -30,7 +30,7 @@ import org.redisson.EvictionScheduler;
import org.redisson.api.RMapCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -342,7 +342,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
@Override
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadReactive(client, getName(), new ScanCodec(codec), EVAL_HSCAN,
return commandExecutor.evalReadReactive(client, getName(), new MapScanCodec(codec), EVAL_HSCAN,
"local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "

@ -24,7 +24,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -130,7 +130,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
}
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
}
@Override

@ -23,6 +23,7 @@ import java.util.Collections;
import org.reactivestreams.Publisher;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactive implements RScoredSortedSetReactive<V> {
@ -122,15 +124,15 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZRANK, getName(), o);
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -30,6 +30,7 @@ import org.redisson.api.RSetCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
/**
@ -76,7 +77,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
return reactive(instance.containsAsync(o));
}
Publisher<ListScanResult<V>> scanIterator(InetSocketAddress client, long startPos) {
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(InetSocketAddress client, long startPos) {
return reactive(instance.scanIteratorAsync(client, startPos));
}
@ -84,7 +85,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
};

@ -26,8 +26,10 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
/**
@ -66,8 +68,8 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return reactive(instance.containsAsync(o));
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -156,7 +158,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -16,13 +16,16 @@
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -32,28 +35,27 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<V> firstValues;
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private boolean finished;
@Override
protected void onRequest(long n) {
currentIndex = n;
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
onNext(val);
private void handle(List<ScanObjectEntry> vals) {
for (ScanObjectEntry val : vals) {
onNext((V)val.getObj());
}
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
@Override
public void onSubscribe(Subscription s) {
@ -61,32 +63,68 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
public void onNext(ListScanResult<ScanObjectEntry> res) {
if (finished) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
long prevIterPos = nextIterPos;
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (res.getPos() == 0) {
finished = true;
m.onComplete();
return;
}
}
} else if (lastValues.removeAll(firstValues)) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
finished = true;
m.onComplete();
return;
}
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
finished = true;
m.onComplete();
currentIndex = 0;
}
}
@ -97,7 +135,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
@Override
public void onComplete() {
if (currentIndex == 0) {
if (finished) {
return;
}
nextValues();
@ -106,7 +144,24 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
});
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
}
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
}
return result;
}
protected abstract Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
}

@ -86,9 +86,9 @@ public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0.11, 1);
set.add(0.22, 3);
set.add(0.33, 7);
sync(set.add(0.11, 1));
sync(set.add(0.22, 3));
sync(set.add(0.33, 7));
Assert.assertTrue(sync(set.remove(1)));
Assert.assertFalse(sync(set.contains(1)));

@ -75,10 +75,10 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
assertThat(sync(set.add("123", 1, TimeUnit.SECONDS))).isFalse();
Thread.sleep(50);
Thread.sleep(800);
assertThat(sync(set.contains("123"))).isTrue();
Thread.sleep(150);
Thread.sleep(250);
assertThat(sync(set.contains("123"))).isFalse();
}
@ -104,12 +104,15 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
}
@Test
public void testIteratorSequence() {
public void testIteratorSequence() throws InterruptedException {
RSetCacheReactive<Long> set = redisson.getSetCache("set");
for (int i = 0; i < 1000; i++) {
sync(set.add(Long.valueOf(i)));
set.add(Long.valueOf(i));
}
Thread.sleep(1000);
assertThat(sync(set.size())).isEqualTo(1000);
Set<Long> setCopy = new HashSet<Long>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Long.valueOf(i));

Loading…
Cancel
Save