Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java
#	redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java
pull/1821/head
Nikita 7 years ago
commit 2b2ef1ca28

@ -15,19 +15,17 @@
*/
package org.redisson;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
abstract class RedissonBaseIterator<V> extends BaseIterator<V, ScanObjectEntry> {
abstract class RedissonBaseIterator<V> extends BaseIterator<V, Object> {
@Override
protected V getValue(ScanObjectEntry entry) {
return (V) entry.getObj();
protected V getValue(Object entry) {
return (V) entry;
}
}

@ -19,19 +19,17 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Entry<ScanObjectEntry, ScanObjectEntry>> {
public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Entry<Object, Object>> {
@SuppressWarnings("unchecked")
protected V getValue(final Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V)new AbstractMap.SimpleEntry(entry.getKey().getObj(), entry.getValue().getObj()) {
protected V getValue(final Map.Entry<Object, Object> entry) {
return (V)new AbstractMap.SimpleEntry(entry.getKey(), entry.getValue()) {
@Override
public Object setValue(Object value) {
@ -41,6 +39,6 @@ public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Ent
};
}
protected abstract Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value);
protected abstract Object put(Entry<Object, Object> entry, Object value);
}

@ -850,6 +850,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
if (startDate == null) {
throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date");
}
long startTime = startDate.getTime();
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null);
addListener(result);

@ -32,12 +32,10 @@ import org.redisson.api.RObject;
import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
@ -113,12 +111,12 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null, count);
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
private ListScanResult<Object> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
}
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
}
@ -126,13 +124,13 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonKeys.this.delete((String)value.getObj());
protected void remove(Object value) {
RedissonKeys.this.delete((String)value);
}
};

@ -42,14 +42,12 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.mapreduce.RedissonMapReduce;
@ -348,8 +346,19 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return keySet(null);
}
@Override
public Set<K> keySet(String pattern) {
return new KeySet(pattern);
return keySet(pattern, 10);
}
@Override
public Set<K> keySet(String pattern, int count) {
return new KeySet(pattern, count);
}
@Override
public Set<K> keySet(int count) {
return keySet(null, count);
}
@Override
@ -357,8 +366,19 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return values(null);
}
@Override
public Collection<V> values(String keyPattern, int count) {
return new Values(keyPattern, count);
}
@Override
public Collection<V> values(String keyPattern) {
return new Values(keyPattern);
return values(keyPattern, 10);
}
@Override
public Collection<V> values(int count) {
return values(null, count);
}
@Override
@ -366,8 +386,19 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return entrySet(null);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern) {
return new EntrySet(keyPattern);
return entrySet(keyPattern, 10);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern, int count) {
return new EntrySet(keyPattern, count);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet(int count) {
return entrySet(null, count);
}
@Override
@ -1014,15 +1045,20 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<MapScanResult<Object, Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f);
}
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos, "MATCH", pattern);
return get(f);
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "COUNT", count);
return f;
}
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return f;
}
@Override
@ -1101,11 +1137,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return h;
}
protected Iterator<K> keyIterator(String pattern) {
return new RedissonMapIterator<K>(RedissonMap.this, pattern) {
protected Iterator<K> keyIterator(String pattern, int count) {
return new RedissonMapIterator<K>(RedissonMap.this, pattern, count) {
@Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(java.util.Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
};
}
@ -1113,14 +1149,16 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
class KeySet extends AbstractSet<K> {
private final String pattern;
private final int count;
public KeySet(String pattern) {
public KeySet(String pattern, int count) {
this.pattern = pattern;
this.count = count;
}
@Override
public Iterator<K> iterator() {
return keyIterator(pattern);
return keyIterator(pattern, count);
}
@Override
@ -1152,11 +1190,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
protected Iterator<V> valueIterator(String pattern) {
return new RedissonMapIterator<V>(RedissonMap.this, pattern) {
protected Iterator<V> valueIterator(String pattern, int count) {
return new RedissonMapIterator<V>(RedissonMap.this, pattern, count) {
@Override
protected V getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
protected V getValue(java.util.Map.Entry<Object, Object> entry) {
return (V) entry.getValue();
}
};
}
@ -1164,14 +1202,16 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
final class Values extends AbstractCollection<V> {
private final String keyPattern;
private final int count;
public Values(String keyPattern) {
public Values(String keyPattern, int count) {
this.keyPattern = keyPattern;
this.count = count;
}
@Override
public Iterator<V> iterator() {
return valueIterator(keyPattern);
return valueIterator(keyPattern, count);
}
@Override
@ -1199,8 +1239,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
protected Iterator<Map.Entry<K,V>> entryIterator(String pattern) {
return new RedissonMapIterator<Map.Entry<K, V>>(RedissonMap.this, pattern);
protected Iterator<Map.Entry<K,V>> entryIterator(String pattern, int count) {
return new RedissonMapIterator<Map.Entry<K, V>>(RedissonMap.this, pattern, count);
}
private void loadValue(final K key, final RPromise<V> result, final boolean replaceValue) {
@ -1288,13 +1328,15 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
private final String keyPattern;
private final int count;
public EntrySet(String keyPattern) {
public EntrySet(String keyPattern, int count) {
this.keyPattern = keyPattern;
this.count = count;
}
public final Iterator<Map.Entry<K,V>> iterator() {
return entryIterator(keyPattern);
return entryIterator(keyPattern, count);
}
public final boolean contains(Object o) {

@ -39,7 +39,6 @@ import org.redisson.api.map.event.MapEntryListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -52,7 +51,6 @@ import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
@ -1210,28 +1208,29 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
return get(scanIteratorAsync(name, client, startPos, pattern));
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
return get(scanIteratorAsync(name, client, startPos, pattern, count));
}
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) {
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
if (pattern != null) {
params.add(pattern);
}
params.add(count);
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN,
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res; "
+ "if (#ARGV == 3) then "
+ " res = redis.call('hscan', KEYS[1], ARGV[2], 'match', ARGV[3]); "
+ "if (#ARGV == 4) then "
+ " res = redis.call('hscan', KEYS[1], ARGV[2], 'match', ARGV[3], 'count', ARGV[4]); "
+ "else "
+ " res = redis.call('hscan', KEYS[1], ARGV[2]); "
+ " res = redis.call('hscan', KEYS[1], ARGV[2], 'count', ARGV[3]); "
+ "end;"
+ "local currentTime = tonumber(ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
@ -1264,12 +1263,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.<Object>asList(name, getTimeoutSetName(name), getIdleSetName(name)),
params.toArray());
f.addListener(new FutureListener<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>>() {
f.addListener(new FutureListener<MapCacheScanResult<Object, Object>>() {
@Override
public void operationComplete(Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> future)
public void operationComplete(Future<MapCacheScanResult<Object, Object>> future)
throws Exception {
if (future.isSuccess()) {
MapCacheScanResult<ScanObjectEntry, ScanObjectEntry> res = future.getNow();
MapCacheScanResult<Object, Object> res = future.getNow();
if (res.getIdleKeys().isEmpty()) {
return;
}
@ -1303,7 +1302,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
});
return (RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>)(Object)f;
return (RFuture<MapScanResult<Object, Object>>)(Object)f;
}

@ -18,7 +18,6 @@ package org.redisson;
import java.util.Map.Entry;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -30,25 +29,27 @@ public class RedissonMapIterator<M> extends RedissonBaseMapIterator<M> {
private final RedissonMap map;
private final String pattern;
private final int count;
public RedissonMapIterator(RedissonMap map, String pattern) {
public RedissonMapIterator(RedissonMap map, String pattern, int count) {
this.map = map;
this.pattern = pattern;
this.count = count;
}
@Override
protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.put(entry.getKey().getObj(), value);
protected Object put(Entry<Object, Object> entry, Object value) {
return map.put(entry.getKey(), value);
}
@Override
protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(map.getName(), client, nextIterPos, pattern);
protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(map.getName(), client, nextIterPos, pattern, count);
}
@Override
protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
map.fastRemove(value.getKey().getObj());
protected void remove(Entry<Object, Object> value) {
map.fastRemove(value.getKey());
}
}

@ -24,7 +24,6 @@ import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -37,7 +36,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> keysIter;
private Iterator<Map.Entry<Object, Object>> keysIter;
protected long keysIterPos = 0;
private K currentKey;
@ -75,7 +74,7 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
while (true) {
if (!keysFinished && (keysIter == null || !keysIter.hasNext())) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = map.scanIterator(client, keysIterPos);
MapScanResult<Object, Object> res = map.scanIterator(client, keysIterPos);
client = res.getRedisClient();
keysIter = res.getMap().entrySet().iterator();
keysIterPos = res.getPos();
@ -86,9 +85,9 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
}
while (keysIter.hasNext()) {
Entry<ScanObjectEntry, ScanObjectEntry> e = keysIter.next();
currentKey = (K) e.getKey().getObj();
String name = e.getValue().getObj().toString();
Entry<Object, Object> e = keysIter.next();
currentKey = (K) e.getKey();
String name = e.getValue().toString();
valuesIter = getIterator(name);
if (valuesIter.hasNext()) {
return true;

@ -18,7 +18,6 @@ package org.redisson;
import java.util.Map.Entry;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -34,18 +33,18 @@ public class RedissonMultiMapKeysIterator<V> extends RedissonBaseMapIterator<V>
this.map = map;
}
@Override
protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.put(entry.getKey().getObj(), value);
protected Object put(Entry<Object, Object> entry, Object value) {
return map.put(entry.getKey(), value);
}
@Override
protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(client, nextIterPos);
}
@Override
protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
map.fastRemove(value.getKey().getObj());
protected void remove(Entry<Object, Object> value) {
map.fastRemove(value.getKey());
}
}

@ -34,12 +34,11 @@ import org.redisson.api.RReadWriteLock;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash;
@ -301,8 +300,8 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
MapScanResult<Object, Object> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<Object, Object>> f = commandExecutor.readAsync(client, getName(), new CompositeCodec(codec, StringCodec.INSTANCE, codec), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}
@ -316,8 +315,8 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
public Iterator<K> iterator() {
return new RedissonMultiMapKeysIterator<K>(RedissonMultimap.this) {
@Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(java.util.Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
};
}

@ -24,8 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
@ -36,13 +36,11 @@ import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
@ -394,23 +392,42 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "COUNT", count);
return get(f);
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "MATCH", pattern, "COUNT", count);
return get(f);
}
@Override
public Iterator<V> iterator() {
return iterator(null, 10);
}
@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos);
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonScoredSortedSet.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonScoredSortedSet.this.remove((V)value);
}
};

@ -31,10 +31,8 @@ import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.Hash;
@ -94,28 +92,38 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count);
return get(f);
}
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos, "MATCH", pattern);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return get(f);
}
@Override
public Iterator<V> iterator(final String pattern) {
public Iterator<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSet.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSet.this.remove((V)value);
}
};
@ -567,8 +575,8 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern, int count) {
throw new UnsupportedOperationException();
}

@ -31,10 +31,8 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
@ -123,27 +121,28 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos, pattern);
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f);
}
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(startPos);
params.add(System.currentTimeMillis());
if (pattern != null) {
params.add(pattern);
}
params.add(count);
return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res; "
+ "if (#ARGV == 3) then "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'match', ARGV[3]); "
+ "if (#ARGV == 4) then "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'match', ARGV[3], 'count', ARGV[4]); "
+ "else "
+ " res = redis.call('zscan', KEYS[1], ARGV[1]); "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'count', ARGV[3]); "
+ "end;"
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then "
@ -157,17 +156,27 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
@Override
public Iterator<V> iterator(final String pattern) {
public Iterator<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSetCache.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSetCache.this.remove((V)value);
}
};

@ -30,7 +30,6 @@ import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -39,7 +38,6 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -167,7 +165,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos, String pattern) {
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
@ -175,8 +173,9 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
if (pattern != null) {
params.add(pattern);
}
params.add(count);
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new MapScanCodec(codec), EVAL_SSCAN,
RFuture<ListScanResult<Object>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
@ -187,10 +186,10 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
+ "end;"
+ "local res; "
+ "if (#ARGV == 4) then "
+ "res = redis.call('sscan', KEYS[2], ARGV[2], 'match', ARGV[4]); "
+ "if (#ARGV == 5) then "
+ "res = redis.call('sscan', KEYS[2], ARGV[2], 'match', ARGV[4], 'count', ARGV[5]); "
+ "else "
+ "res = redis.call('sscan', KEYS[2], ARGV[2]); "
+ "res = redis.call('sscan', KEYS[2], ARGV[2], 'count', ARGV[4]); "
+ "end;"
+ "return res;",
@ -199,17 +198,28 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return get(f);
}
public Iterator<V> iterator(final String pattern) {
@Override
public Iterator<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern);
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSetMultimapValues.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSetMultimapValues.this.remove((V)value);
}
};

@ -18,7 +18,6 @@ package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -27,9 +26,9 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count);
RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern);
RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count);
boolean remove(Object value);

@ -304,8 +304,10 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
Map<K, V> readAllMap();
/**
* Returns key set.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllKeySet()} does.
* Returns key set of this map.
* Keys are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllKeySet()
*
* @return key set
*/
@ -313,8 +315,20 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
Set<K> keySet();
/**
* Returns key set matches pattern.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllKeySet()} does.
* Returns key set of this map.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllKeySet()
*
* @param count - size of keys batch
* @return key set
*/
Set<K> keySet(int count);
/**
* Returns key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* Supported glob-style patterns:
* <p>
@ -324,24 +338,48 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @param count - size of keys batch
* @return key set
*/
Set<K> keySet(String pattern);
Set<K> keySet(String pattern, int count);
/**
* Returns key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @return key set
*/
Set<K> keySet(String pattern);
/**
* Returns values collection.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllValues()} does.
* Returns values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllValues()
*
* @return value collection
* @return values collection
*/
@Override
Collection<V> values();
/**
* Returns values collection matches key pattern.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllValues()} does.
* Returns values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
@ -351,14 +389,50 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param keyPattern - key pattern
* @return value collection
* @return values collection
*/
Collection<V> values(String keyPattern);
/**
* Returns values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param keyPattern - key pattern
* @param count - size of values batch
* @return values collection
*/
Collection<V> values(String keyPattern, int count);
/**
* Returns values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllValues()
*
* @param count - size of values batch
* @return values collection
*/
Collection<V> values(int count);
/**
* Returns map entries collection.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllEntrySet()} does.
* Map entries are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllEntrySet()
*
* @return map entries collection
*/
@ -366,8 +440,9 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
Set<java.util.Map.Entry<K, V>> entrySet();
/**
* Returns map entries collection matches key pattern.
* This method <b>DOESN'T</b> fetch all of them as {@link #readAllEntrySet()} does.
* Returns map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
@ -377,9 +452,43 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param keyPattern - key pattern
* @return map entries collection
*/
Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern);
/**
* Returns map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param keyPattern - key pattern
* @param count - size of entries batch
* @return map entries collection
*/
Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern, int count);
/**
* Returns map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllEntrySet()
*
* @param count - size of entries batch
* @return map entries collection
*/
Set<java.util.Map.Entry<K, V>> entrySet(int count);
}

@ -264,6 +264,7 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @param key - map key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.

@ -224,6 +224,7 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @param key - map key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.

@ -17,8 +17,8 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.api.map.MapLoader;
@ -252,10 +252,189 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
*/
Publisher<V> putIfAbsent(K key, V value);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllEntrySet()
*
* @return iterator
*/
Publisher<Map.Entry<K, V>> entryIterator();
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllEntrySet()
*
* @param count - size of entries batch
* @return iterator
*/
Publisher<Map.Entry<K, V>> entryIterator(int count);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param pattern - key pattern
* @return iterator
*/
Publisher<Map.Entry<K, V>> entryIterator(String pattern);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllEntrySet()
*
* @param pattern - key pattern
* @param count - size of entries batch
* @return iterator
*/
Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllValues()
*
* @return iterator
*/
Publisher<V> valueIterator();
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllValues()
*
* @param count - size of values batch
* @return iterator
*/
Publisher<V> valueIterator(int count);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param pattern - key pattern
* @return iterator
*/
Publisher<V> valueIterator(String pattern);
/**
* Returns iterator over values collection of this map.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllValues()
*
* @param pattern - key pattern
* @param count - size of values batch
* @return iterator
*/
Publisher<V> valueIterator(String pattern, int count);
/**
* Returns iterator over key set of this map.
* Keys are loaded in batch. Batch size is <code>10</code>.
*
* @see #readAllKeySet()
*
* @return iterator
*/
Publisher<K> keyIterator();
/**
* Returns iterator over key set of this map.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @see #readAllKeySet()
*
* @param count - size of keys batch
* @return iterator
*/
Publisher<K> keyIterator(int count);
/**
* Returns iterator over key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @return iterator
*/
Publisher<K> keyIterator(String pattern);
/**
* Returns iterator over key set of this map.
* If <code>pattern</code> is not null then only keys match this pattern are loaded.
* Keys are loaded in batch. Batch size is defined by <code>count</code> param.
*
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @see #readAllKeySet()
*
* @param pattern - key pattern
* @param count - size of keys batch
* @return iterator
*/
Publisher<K> keyIterator(String pattern, int count);
}

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -229,10 +230,49 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
boolean tryAdd(double score, V object);
/**
* Returns size of this set.
*
* @return size
*/
int size();
/**
* Returns <code>true</code> if this set is empty
*
* @return <code>true</code> if empty
*/
boolean isEmpty();
/**
* Returns an iterator over elements in this set.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @return iterator
*/
Iterator<V> iterator(String pattern);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(String pattern, int count);
boolean contains(Object o);
Object[] toArray();

@ -38,9 +38,29 @@ public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set
RLock getLock(V value);
/**
* Returns values iterator matches <code>pattern</code>.
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param pattern for values
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(String pattern, int count);
/**
* Returns iterator over elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return iterator
*/
Iterator<V> iterator(String pattern);

@ -48,10 +48,30 @@ public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V> {
*/
RLock getLock(V value);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(String pattern, int count);
/**
* Returns values iterator matches <code>pattern</code>.
*
* @param pattern for values
* @param pattern - search pattern
* @return iterator
*/
Iterator<V> iterator(String pattern);

@ -1,110 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class MapScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public MapScanCodec(Codec delegate) {
this(delegate, null);
}
public MapScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
@Override
public ClassLoader getClassLoader() {
return getClass().getClassLoader();
}
}

@ -1,86 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class ScanCodec implements Codec {
private final Codec delegate;
public ScanCodec(Codec delegate) {
this.delegate = delegate;
}
@Override
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getValueDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return delegate.getMapValueDecoder();
}
@Override
public Encoder getMapValueEncoder() {
return delegate.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return delegate.getMapKeyDecoder();
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
@Override
public ClassLoader getClassLoader() {
return delegate.getClassLoader();
}
}

@ -57,7 +57,6 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void run(Timeout timeout) throws Exception {
if (future.cancel(false) || !future.isSuccess()) {
System.out.println("closed!!! " + future + " " + connection.getChannel());
ctx.channel().close();
} else {
sendPing(ctx);

@ -1,43 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.misc.HashValue;
/**
*
* @author Nikita Koksharov
*
*/
public class ScanObjectEntry {
private final HashValue hash;
private final Object obj;
public ScanObjectEntry(HashValue hash, Object obj) {
this.hash = hash;
this.obj = obj;
}
public HashValue getHash() {
return hash;
}
public Object getObj() {
return obj;
}
}

@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.ScanResult;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -55,7 +55,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -1005,8 +1004,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Map oldMap = ((MapScanResult) o).getMap();
Map map = tryHandleReference(oldMap);
if (map != oldMap) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> newScanResult
= new MapScanResult<ScanObjectEntry, ScanObjectEntry>(scanResult.getPos(), map);
MapScanResult<Object, Object> newScanResult
= new MapScanResult<Object, Object>(scanResult.getPos(), map);
newScanResult.setRedisClient(scanResult.getRedisClient());
return (T) newScanResult;
} else {
@ -1023,10 +1022,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) o);
return (T) new ScoredEntry(se.getScore(), fromReference(se.getValue()));
} else if (o instanceof ScanObjectEntry) {
ScanObjectEntry keyScan = (ScanObjectEntry) o;
Object obj = tryHandleReference0(keyScan.getObj());
return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getHash(), obj) : o;
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(old.getKey());

@ -17,7 +17,6 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -348,10 +347,6 @@ public class MasterSlaveEntry {
return addSlave(client, freezed, nodeType);
}
public ClientConnectionsEntry getSlaveEntry(RedisClient client) {
return slaveBalancer.getEntry(client);
}
public Collection<ClientConnectionsEntry> getAllEntries() {
return slaveBalancer.getEntries();
}
@ -453,8 +448,8 @@ public class MasterSlaveEntry {
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster);
slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient, NodeType.MASTER);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER);
// more than one slave available, so master can be removed from slaves
if (!config.checkSkipSlavesInit()

@ -66,12 +66,8 @@ public class LoadBalancerManager {
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
}
public void changeType(RedisClient redisClient, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(redisClient);
changeType(nodeType, entry);
}
protected void changeType(NodeType nodeType, ClientConnectionsEntry entry) {
public void changeType(InetSocketAddress address, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(address);
if (entry != null) {
if (connectionManager.isClusterMode()) {
entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER);
@ -200,7 +196,7 @@ public class LoadBalancerManager {
return getEntry(redisClient) != null;
}
protected ClientConnectionsEntry getEntry(URI addr) {
private ClientConnectionsEntry getEntry(URI addr) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
InetSocketAddress entryAddr = entry.getClient().getAddr();
if (URIBuilder.compare(entryAddr, addr)) {
@ -210,7 +206,7 @@ public class LoadBalancerManager {
return null;
}
protected ClientConnectionsEntry getEntry(InetSocketAddress address) {
private ClientConnectionsEntry getEntry(InetSocketAddress address) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
InetSocketAddress addr = entry.getClient().getAddr();
if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) {
@ -220,7 +216,7 @@ public class LoadBalancerManager {
return null;
}
public ClientConnectionsEntry getEntry(RedisClient redisClient) {
private ClientConnectionsEntry getEntry(RedisClient redisClient) {
return client2Entry.get(redisClient);
}

@ -121,12 +121,18 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public void schedule(String className, byte[] classBody, byte[] state, long startTime, String cronExpression, String executorId, String requestId) {
Date nextStartDate = new CronExpression(cronExpression).getNextValidTimeAfter(new Date());
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId);
RFuture<Void> future = null;
if (nextStartDate != null) {
RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(executorId, requestId);
future = service.schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId);
}
try {
executeRunnable(className, classBody, state, requestId);
} catch (RuntimeException e) {
// cancel task if it throws an exception
if (future != null) {
future.cancel(true);
}
throw e;
}
}

@ -60,12 +60,10 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.jcache.JMutableEntry.Action;
import org.redisson.jcache.configuration.JCacheConfiguration;
@ -73,7 +71,6 @@ import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
* JCache implementation
@ -2084,9 +2081,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos);
try {
return get(f);
} catch (Exception e) {
@ -2097,22 +2094,22 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
protected Iterator<K> keyIterator() {
return new RedissonBaseMapIterator<K>() {
@Override
protected K getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
@Override
protected void remove(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> value) {
protected void remove(java.util.Map.Entry<Object, Object> value) {
throw new UnsupportedOperationException();
}
@Override
protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
}
@ -2416,32 +2413,32 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
checkNotClosed();
return new RedissonBaseMapIterator<javax.cache.Cache.Entry<K, V>>() {
@Override
protected Cache.Entry<K, V> getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
protected Cache.Entry<K, V> getValue(Map.Entry<Object, Object> entry) {
cacheManager.getStatBean(JCache.this).addHits(1);
Long accessTimeout = getAccessTimeout();
JCacheEntry<K, V> je = new JCacheEntry<K, V>((K) entry.getKey().getObj(), (V) entry.getValue().getObj());
JCacheEntry<K, V> je = new JCacheEntry<K, V>((K) entry.getKey(), (V) entry.getValue());
if (accessTimeout == 0) {
remove();
} else if (accessTimeout != -1) {
write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey().getObj()));
write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey()));
}
return je;
}
@Override
protected void remove(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
JCache.this.remove((K) entry.getKey().getObj());
protected void remove(Map.Entry<Object, Object> entry) {
JCache.this.remove((K) entry.getKey());
}
@Override
protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
}

@ -18,7 +18,6 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -29,7 +28,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
interface MapReactive<K, V> {
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos);
Publisher<MapScanResult<Object, Object>> scanIteratorReactive(RedisClient client, long startPos, String pattern, int count);
Publisher<V> put(K key, V value);

@ -34,7 +34,6 @@ import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -203,11 +202,11 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
}
@Override
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>>() {
public Publisher<MapScanResult<Object, Object>> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) {
return reactive(new Supplier<RFuture<MapScanResult<Object, Object>>>() {
@Override
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> get() {
return ((RedissonMapCache<K, V>)mapCache).scanIteratorAsync(getName(), client, startPos, null);
public RFuture<MapScanResult<Object, Object>> get() {
return ((RedissonMapCache<K, V>)mapCache).scanIteratorAsync(getName(), client, startPos, pattern, count);
}
});
}
@ -324,25 +323,70 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
@Override
public Publisher<Map.Entry<K, V>> entryIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this));
return entryIterator(null);
}
@Override
public Publisher<Map.Entry<K, V>> entryIterator(int count) {
return entryIterator(null, count);
}
@Override
public Publisher<Map.Entry<K, V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
@Override
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this, pattern, count));
}
@Override
public Publisher<V> valueIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this) {
return valueIterator(null);
}
@Override
public Publisher<V> valueIterator(String pattern) {
return valueIterator(pattern, 10);
}
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
public Publisher<V> valueIterator(int count) {
return valueIterator(null, count);
}
@Override
public Publisher<V> valueIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
});
}
@Override
public Publisher<K> keyIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this) {
return keyIterator(null);
}
@Override
public Publisher<K> keyIterator(String pattern) {
return keyIterator(pattern, 10);
}
@Override
public Publisher<K> keyIterator(int count) {
return keyIterator(null, count);
}
@Override
public Publisher<K> keyIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this, pattern, count) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
});
}

@ -31,10 +31,7 @@ import org.redisson.api.RMapAsync;
import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Flux;
@ -292,31 +289,81 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
});
}
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
@Override
public Publisher<MapScanResult<Object, Object>> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) {
return reactive(new Supplier<RFuture<MapScanResult<Object, Object>>>() {
@Override
public RFuture<MapScanResult<Object, Object>> get() {
return ((RedissonMap<K, V>)instance).scanIteratorAsync(getName(), client, startPos, pattern, count);
}
});
}
@Override
public Publisher<Map.Entry<K, V>> entryIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this));
return entryIterator(null);
}
@Override
public Publisher<Entry<K, V>> entryIterator(int count) {
return entryIterator(null, count);
}
@Override
public Publisher<Entry<K, V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this, pattern, count));
}
@Override
public Publisher<V> valueIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this) {
return valueIterator(null);
}
@Override
public Publisher<V> valueIterator(String pattern) {
return valueIterator(pattern, 10);
}
@Override
public Publisher<V> valueIterator(int count) {
return valueIterator(null, count);
}
@Override
public Publisher<V> valueIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this, pattern, count) {
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
});
}
@Override
public Publisher<K> keyIterator() {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this) {
return keyIterator(null);
}
@Override
public Publisher<K> keyIterator(String pattern) {
return keyIterator(pattern, 10);
}
@Override
public Publisher<K> keyIterator(int count) {
return keyIterator(null, count);
}
@Override
public Publisher<K> keyIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this, pattern, count) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
});
}

@ -16,8 +16,6 @@
package org.redisson.reactive;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -28,8 +26,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
@ -45,17 +41,19 @@ import reactor.core.publisher.Mono;
public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {
private final MapReactive<K, V> map;
private final String pattern;
private final int count;
public RedissonMapReactiveIterator(MapReactive<K, V> map) {
public RedissonMapReactiveIterator(MapReactive<K, V> map, String pattern, int count) {
this.map = map;
this.pattern = pattern;
this.count = count;
}
@Override
public void accept(FluxSink<M> emitter) {
emitter.onRequest(new LongConsumer() {
private Map<HashValue, HashValue> firstValues;
private Map<HashValue, HashValue> lastValues;
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
@ -74,14 +72,7 @@ public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M
};
protected void nextValues(FluxSink<M> emitter) {
map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() {
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}
map.scanIteratorReactive(client, nextIterPos, pattern, count).subscribe(new Subscriber<MapScanResult<Object, Object>>() {
@Override
public void onSubscribe(Subscription s) {
@ -89,7 +80,7 @@ public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M
}
@Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
public void onNext(MapScanResult<Object, Object> res) {
if (finished) {
client = null;
nextIterPos = 0;
@ -99,7 +90,7 @@ public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) {
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
M val = getValue(entry);
emitter.next(val);
elementsRead.incrementAndGet();
@ -139,12 +130,12 @@ public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M
return false;
}
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
M getValue(final Entry<Object, Object> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey(), (V)entry.getValue()) {
@Override
public V setValue(V value) {
Publisher<V> publisher = map.put((K) entry.getKey().getObj(), value);
Publisher<V> publisher = map.put((K) entry.getKey(), value);
return Mono.from(publisher).block();
}

@ -27,11 +27,9 @@ import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Flux;
@ -184,15 +182,15 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
});

@ -33,7 +33,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -103,11 +102,11 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
});
}
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
Publisher<ListScanResult<Object>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<Object>>>() {
@Override
public RFuture<ListScanResult<ScanObjectEntry>> get() {
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null);
public RFuture<ListScanResult<Object>> get() {
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null, 10);
}
});
}
@ -116,7 +115,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
});

@ -29,10 +29,8 @@ import org.redisson.api.RSetAsync;
import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Flux;
@ -112,8 +110,8 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -257,7 +255,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
});

@ -24,7 +24,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import reactor.core.publisher.FluxSink;
@ -58,7 +57,7 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
}
protected void nextValues(FluxSink<V> emitter) {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<Object>>() {
@Override
public void onSubscribe(Subscription s) {
@ -66,7 +65,7 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
}
@Override
public void onNext(ListScanResult<ScanObjectEntry> res) {
public void onNext(ListScanResult<Object> res) {
if (finished) {
client = null;
nextIterPos = 0;
@ -76,8 +75,8 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
client = res.getRedisClient();
nextIterPos = res.getPos();
for (ScanObjectEntry val : res.getValues()) {
emitter.next((V)val.getObj());
for (Object val : res.getValues()) {
emitter.next((V)val);
elementsRead.incrementAndGet();
}
@ -114,6 +113,6 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
return false;
}
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos);
protected abstract Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos);
}

@ -38,7 +38,6 @@ import org.redisson.api.RMap;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -116,15 +115,6 @@ public class BaseTransactionalMap<K, V> {
}
}
private HashValue toValueHash(Object value) {
ByteBuf keyState = ((RedissonObject)map).encodeMapValue(value);
try {
return new HashValue(Hash.hash128(keyState));
} finally {
keyState.release();
}
}
public RFuture<Boolean> isExistsAsync() {
if (deleted != null) {
return RedissonPromise.newSucceededFuture(!deleted);
@ -202,21 +192,20 @@ public class BaseTransactionalMap<K, V> {
return result;
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = ((RedissonMap<?, ?>)map).scanIterator(name, client, startPos, pattern);
protected MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
MapScanResult<Object, Object> res = ((RedissonMap<?, ?>)map).scanIterator(name, client, startPos, pattern, count);
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
for (Iterator<ScanObjectEntry> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
ScanObjectEntry entry = iterator.next();
MapEntry mapEntry = newstate.remove(entry.getHash());
for (Iterator<Object> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
MapEntry mapEntry = newstate.remove(toKeyHash(entry));
if (mapEntry != null) {
if (mapEntry == MapEntry.NULL) {
iterator.remove();
continue;
}
HashValue valueHash = toValueHash(mapEntry.getValue());
res.getMap().put(entry, new ScanObjectEntry(valueHash, mapEntry.getValue()));
res.getMap().put(entry, mapEntry.getValue());
}
}
@ -226,9 +215,7 @@ public class BaseTransactionalMap<K, V> {
continue;
}
ScanObjectEntry key = new ScanObjectEntry(entry.getKey(), entry.getValue().getKey());
ScanObjectEntry value = new ScanObjectEntry(toValueHash(entry.getValue().getValue()), entry.getValue().getValue());
res.getMap().put(key, value);
res.getMap().put(entry.getValue().getKey(), entry.getValue().getValue());
}
}

@ -36,7 +36,6 @@ import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -176,16 +175,16 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.containsAsync(value);
}
protected abstract ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client,
long startPos, String pattern);
protected abstract ListScanResult<Object> scanIteratorSource(String name, RedisClient client,
long startPos, String pattern, int count);
protected ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
ListScanResult<ScanObjectEntry> res = scanIteratorSource(name, client, startPos, pattern);
protected ListScanResult<Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
ListScanResult<Object> res = scanIteratorSource(name, client, startPos, pattern, count);
Map<HashValue, Object> newstate = new HashMap<HashValue, Object>(state);
for (Iterator<ScanObjectEntry> iterator = res.getValues().iterator(); iterator.hasNext();) {
ScanObjectEntry entry = iterator.next();
Object value = newstate.remove(entry.getHash());
for (Iterator<Object> iterator = res.getValues().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
Object value = newstate.remove(toHash(entry));
if (value == NULL) {
iterator.remove();
}
@ -196,7 +195,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
if (entry.getValue() == NULL) {
continue;
}
res.getValues().add(new ScanObjectEntry(entry.getKey(), entry.getValue()));
res.getValues().add(entry.getValue());
}
}

@ -32,7 +32,6 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -107,10 +106,10 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern);
return transactionalMap.scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -31,7 +31,6 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -156,10 +155,10 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern);
return transactionalMap.scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -29,7 +29,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -96,9 +95,9 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern);
return transactionalSet.scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -28,7 +28,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -96,9 +95,9 @@ public class RedissonTransactionalSetCache<V> extends RedissonSetCache<V> {
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern);
return transactionalSet.scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -24,7 +24,6 @@ import org.redisson.api.RLock;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddOperation;
@ -48,9 +47,9 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern) {
return ((RedissonSet<?>)set).scanIterator(name, client, startPos, pattern);
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
return ((RedissonSet<?>)set).scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -25,7 +25,6 @@ import org.redisson.api.RLock;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddCacheOperation;
@ -49,9 +48,9 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern) {
return ((RedissonSetCache<?>)set).scanIterator(name, client, startPos, pattern);
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
return ((RedissonSetCache<?>)set).scanIterator(name, client, startPos, pattern, count);
}
@Override

@ -7,11 +7,12 @@ import static org.redisson.BaseTest.createInstance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -32,8 +33,8 @@ import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
import org.redisson.api.NodeType;
import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodeType;
import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
@ -46,8 +47,6 @@ import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec;
@ -57,7 +56,6 @@ import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
public class RedissonTest {
@ -113,67 +111,6 @@ public class RedissonTest {
localRedisson.shutdown();
}
@Test
public void testIteratorNotLooped() {
RedissonBaseIterator iter = new RedissonBaseIterator() {
int i;
@Override
protected ListScanResult iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult(13L, Collections.emptyList());
}
if (i == 2) {
return new ListScanResult(0L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
protected void remove(Object value) {
}
};
Assert.assertFalse(iter.hasNext());
}
@Test
public void testIteratorNotLooped2() {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i;
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1)));
}
if (i == 2) {
return new ListScanResult(7L, Collections.emptyList());
}
if (i == 3) {
return new ListScanResult(0L, Collections.emptyList());
}
if (i == 4) {
return new ListScanResult(14L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
protected void remove(ScanObjectEntry value) {
}
};
Assert.assertTrue(iter.hasNext());
assertThat(iter.next()).isEqualTo(1);
Assert.assertFalse(iter.hasNext());
}
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
@ -447,6 +384,121 @@ public class RedissonTest {
slave2.stop();
}
// @Test
public void testFailoverInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1, slave4)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 2000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
latch.countDown();
};
};
t.start();
t.join(1000);
Set<InetSocketAddress> addresses = new HashSet<>();
Collection<ClusterNode> masterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : masterNodes) {
addresses.add(clusterNode.getAddr());
}
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(80));
RedisProcess newMaster = null;
Collection<ClusterNode> newMasterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : newMasterNodes) {
if (!addresses.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
}
System.out.println("new-master: " + clusterNode.getAddr());
}
Thread.sleep(50000);
newMaster.stop();
System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
Thread.sleep(60000);
latch.await();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
if (!rFuture.isSuccess()) {
errors++;
} else {
success++;
}
}
System.out.println("errors " + errors + " success " + success);
for (RFuture<?> rFuture : futures) {
if (rFuture.isSuccess()) {
System.out.println(rFuture.isSuccess());
} else {
rFuture.cause().printStackTrace();
}
}
assertThat(readonlyErrors).isZero();
redisson.shutdown();
process.shutdown();
}
@Test
public void testReconnection() throws IOException, InterruptedException, TimeoutException {

@ -7,11 +7,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -114,22 +111,22 @@ public class RedissonTopicTest {
config.useSingleServer().setPingConnectionInterval(50);
RedissonClient redisson = Redisson.create(config);
Set<String> sentItems = new HashSet<>();
Set<String> receivedItems = new HashSet<>();
int count = 1000;
CountDownLatch latch = new CountDownLatch(count);
RTopic<String> eventsTopic = redisson.getTopic("eventsTopic");
eventsTopic.addListener((channel, msg) -> receivedItems.add(msg));
eventsTopic.addListener((channel, msg) -> {
latch.countDown();
});
for(int i = 0; i<1000; i++){
for(int i = 0; i<count; i++){
final String message = UUID.randomUUID().toString();
eventsTopic.publish(message);
sentItems.add(message);
Thread.sleep(10);
}
Thread.sleep(2000);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(sentItems).hasSameSizeAs(receivedItems);
redisson.shutdown();
}

@ -138,12 +138,18 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(2);
}
@Test(expected = IllegalArgumentException.class)
public void testWrongCronExpression() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed"), CronSchedule.of("0 44 12 19 JUN ? 2018"));
}
@Test
public void testCronExpressionMultipleTasks() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?"));
executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?"));
Thread.sleep(30000);
Thread.sleep(30200);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(6);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(30);
}

Loading…
Cancel
Save