refactoring

pull/1506/merge
Nikita 7 years ago
parent 14b39b20ff
commit 7086b16ebf

@ -15,19 +15,17 @@
*/
package org.redisson;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
abstract class RedissonBaseIterator<V> extends BaseIterator<V, ScanObjectEntry> {
abstract class RedissonBaseIterator<V> extends BaseIterator<V, Object> {
@Override
protected V getValue(ScanObjectEntry entry) {
return (V) entry.getObj();
protected V getValue(Object entry) {
return (V) entry;
}
}

@ -19,19 +19,17 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Entry<ScanObjectEntry, ScanObjectEntry>> {
public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Entry<Object, Object>> {
@SuppressWarnings("unchecked")
protected V getValue(final Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V)new AbstractMap.SimpleEntry(entry.getKey().getObj(), entry.getValue().getObj()) {
protected V getValue(final Map.Entry<Object, Object> entry) {
return (V)new AbstractMap.SimpleEntry(entry.getKey(), entry.getValue()) {
@Override
public Object setValue(Object value) {
@ -41,6 +39,6 @@ public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Ent
};
}
protected abstract Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value);
protected abstract Object put(Entry<Object, Object> entry, Object value);
}

@ -32,12 +32,10 @@ import org.redisson.api.RObject;
import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
@ -113,12 +111,12 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null, count);
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
private ListScanResult<Object> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
}
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
}
@ -126,13 +124,13 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonKeys.this.delete((String)value.getObj());
protected void remove(Object value) {
RedissonKeys.this.delete((String)value);
}
};

@ -42,14 +42,12 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.mapreduce.RedissonMapReduce;
@ -1014,14 +1012,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos);
return get(f);
}
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos, "MATCH", pattern);
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern);
return get(f);
}
@ -1104,8 +1102,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
protected Iterator<K> keyIterator(String pattern) {
return new RedissonMapIterator<K>(RedissonMap.this, pattern) {
@Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(java.util.Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
};
}
@ -1155,8 +1153,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
protected Iterator<V> valueIterator(String pattern) {
return new RedissonMapIterator<V>(RedissonMap.this, pattern) {
@Override
protected V getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
protected V getValue(java.util.Map.Entry<Object, Object> entry) {
return (V) entry.getValue();
}
};
}

@ -39,7 +39,6 @@ import org.redisson.api.map.event.MapEntryListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -52,7 +51,6 @@ import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
@ -1210,11 +1208,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
return get(scanIteratorAsync(name, client, startPos, pattern));
}
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) {
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
@ -1223,8 +1221,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN,
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res; "
@ -1264,12 +1262,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.<Object>asList(name, getTimeoutSetName(name), getIdleSetName(name)),
params.toArray());
f.addListener(new FutureListener<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>>() {
f.addListener(new FutureListener<MapCacheScanResult<Object, Object>>() {
@Override
public void operationComplete(Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> future)
public void operationComplete(Future<MapCacheScanResult<Object, Object>> future)
throws Exception {
if (future.isSuccess()) {
MapCacheScanResult<ScanObjectEntry, ScanObjectEntry> res = future.getNow();
MapCacheScanResult<Object, Object> res = future.getNow();
if (res.getIdleKeys().isEmpty()) {
return;
}
@ -1303,7 +1301,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
});
return (RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>)(Object)f;
return (RFuture<MapScanResult<Object, Object>>)(Object)f;
}

@ -18,7 +18,6 @@ package org.redisson;
import java.util.Map.Entry;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -37,18 +36,18 @@ public class RedissonMapIterator<M> extends RedissonBaseMapIterator<M> {
}
@Override
protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.put(entry.getKey().getObj(), value);
protected Object put(Entry<Object, Object> entry, Object value) {
return map.put(entry.getKey(), value);
}
@Override
protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(map.getName(), client, nextIterPos, pattern);
}
@Override
protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
map.fastRemove(value.getKey().getObj());
protected void remove(Entry<Object, Object> value) {
map.fastRemove(value.getKey());
}
}

@ -24,7 +24,6 @@ import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -37,7 +36,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> keysIter;
private Iterator<Map.Entry<Object, Object>> keysIter;
protected long keysIterPos = 0;
private K currentKey;
@ -75,7 +74,7 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
while (true) {
if (!keysFinished && (keysIter == null || !keysIter.hasNext())) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = map.scanIterator(client, keysIterPos);
MapScanResult<Object, Object> res = map.scanIterator(client, keysIterPos);
client = res.getRedisClient();
keysIter = res.getMap().entrySet().iterator();
keysIterPos = res.getPos();
@ -86,9 +85,9 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
}
while (keysIter.hasNext()) {
Entry<ScanObjectEntry, ScanObjectEntry> e = keysIter.next();
currentKey = (K) e.getKey().getObj();
String name = e.getValue().getObj().toString();
Entry<Object, Object> e = keysIter.next();
currentKey = (K) e.getKey();
String name = e.getValue().toString();
valuesIter = getIterator(name);
if (valuesIter.hasNext()) {
return true;

@ -18,7 +18,6 @@ package org.redisson;
import java.util.Map.Entry;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -34,18 +33,18 @@ public class RedissonMultiMapKeysIterator<V> extends RedissonBaseMapIterator<V>
this.map = map;
}
@Override
protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.put(entry.getKey().getObj(), value);
protected Object put(Entry<Object, Object> entry, Object value) {
return map.put(entry.getKey(), value);
}
@Override
protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(client, nextIterPos);
}
@Override
protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
map.fastRemove(value.getKey().getObj());
protected void remove(Entry<Object, Object> value) {
map.fastRemove(value.getKey());
}
}

@ -34,12 +34,11 @@ import org.redisson.api.RReadWriteLock;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash;
@ -301,8 +300,8 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
MapScanResult<Object, Object> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<Object, Object>> f = commandExecutor.readAsync(client, getName(), new CompositeCodec(codec, StringCodec.INSTANCE, codec), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}
@ -316,8 +315,8 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
public Iterator<K> iterator() {
return new RedissonMultiMapKeysIterator<K>(RedissonMultimap.this) {
@Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(java.util.Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
};
}

@ -24,8 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
@ -36,13 +36,11 @@ import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
@ -394,8 +392,8 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
private ListScanResult<Object> scanIterator(RedisClient client, long startPos) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
return get(f);
}
@ -404,13 +402,13 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonScoredSortedSet.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonScoredSortedSet.this.remove((V)value);
}
};

@ -31,10 +31,8 @@ import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.Hash;
@ -94,13 +92,13 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos);
return get(f);
}
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos, "MATCH", pattern);
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern);
return get(f);
}
@ -109,13 +107,13 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSet.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSet.this.remove((V)value);
}
};
@ -567,7 +565,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos,
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern) {
throw new UnsupportedOperationException();
}

@ -31,10 +31,8 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
@ -123,13 +121,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos, pattern);
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern);
return get(f);
}
@Override
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(startPos);
params.add(System.currentTimeMillis());
@ -137,7 +135,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
params.add(pattern);
}
return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res; "
+ "if (#ARGV == 3) then "
@ -161,13 +159,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSetCache.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSetCache.this.remove((V)value);
}
};

@ -30,7 +30,6 @@ import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -39,7 +38,6 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -167,7 +165,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos, String pattern) {
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
@ -176,7 +174,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
params.add(pattern);
}
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new MapScanCodec(codec), EVAL_SSCAN,
RFuture<ListScanResult<Object>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
@ -203,13 +201,13 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern);
}
@Override
protected void remove(ScanObjectEntry value) {
RedissonSetMultimapValues.this.remove((V)value.getObj());
protected void remove(Object value) {
RedissonSetMultimapValues.this.remove((V)value);
}
};

@ -18,7 +18,6 @@ package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -27,9 +26,9 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern);
RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern);
RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern);
boolean remove(Object value);

@ -1,110 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class MapScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public MapScanCodec(Codec delegate) {
this(delegate, null);
}
public MapScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
@Override
public ClassLoader getClassLoader() {
return getClass().getClassLoader();
}
}

@ -1,86 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class ScanCodec implements Codec {
private final Codec delegate;
public ScanCodec(Codec delegate) {
this.delegate = delegate;
}
@Override
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getValueDecoder().decode(buf, state);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return delegate.getMapValueDecoder();
}
@Override
public Encoder getMapValueEncoder() {
return delegate.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return delegate.getMapKeyDecoder();
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
@Override
public ClassLoader getClassLoader() {
return delegate.getClassLoader();
}
}

@ -57,7 +57,6 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void run(Timeout timeout) throws Exception {
if (future.cancel(false) || !future.isSuccess()) {
System.out.println("closed!!! " + future + " " + connection.getChannel());
ctx.channel().close();
} else {
sendPing(ctx);

@ -1,43 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.misc.HashValue;
/**
*
* @author Nikita Koksharov
*
*/
public class ScanObjectEntry {
private final HashValue hash;
private final Object obj;
public ScanObjectEntry(HashValue hash, Object obj) {
this.hash = hash;
this.obj = obj;
}
public HashValue getHash() {
return hash;
}
public Object getObj() {
return obj;
}
}

@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.ScanResult;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -55,7 +55,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -1005,8 +1004,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Map oldMap = ((MapScanResult) o).getMap();
Map map = tryHandleReference(oldMap);
if (map != oldMap) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> newScanResult
= new MapScanResult<ScanObjectEntry, ScanObjectEntry>(scanResult.getPos(), map);
MapScanResult<Object, Object> newScanResult
= new MapScanResult<Object, Object>(scanResult.getPos(), map);
newScanResult.setRedisClient(scanResult.getRedisClient());
return (T) newScanResult;
} else {
@ -1023,10 +1022,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) o);
return (T) new ScoredEntry(se.getScore(), fromReference(se.getValue()));
} else if (o instanceof ScanObjectEntry) {
ScanObjectEntry keyScan = (ScanObjectEntry) o;
Object obj = tryHandleReference0(keyScan.getObj());
return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getHash(), obj) : o;
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(old.getKey());

@ -60,12 +60,10 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.jcache.JMutableEntry.Action;
import org.redisson.jcache.configuration.JCacheConfiguration;
@ -73,7 +71,6 @@ import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
* JCache implementation
@ -2084,9 +2081,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos);
try {
return get(f);
} catch (Exception e) {
@ -2097,22 +2094,22 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
protected Iterator<K> keyIterator() {
return new RedissonBaseMapIterator<K>() {
@Override
protected K getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
protected K getValue(Map.Entry<Object, Object> entry) {
return (K) entry.getKey();
}
@Override
protected void remove(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> value) {
protected void remove(java.util.Map.Entry<Object, Object> value) {
throw new UnsupportedOperationException();
}
@Override
protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
}
@ -2416,32 +2413,32 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
checkNotClosed();
return new RedissonBaseMapIterator<javax.cache.Cache.Entry<K, V>>() {
@Override
protected Cache.Entry<K, V> getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
protected Cache.Entry<K, V> getValue(Map.Entry<Object, Object> entry) {
cacheManager.getStatBean(JCache.this).addHits(1);
Long accessTimeout = getAccessTimeout();
JCacheEntry<K, V> je = new JCacheEntry<K, V>((K) entry.getKey().getObj(), (V) entry.getValue().getObj());
JCacheEntry<K, V> je = new JCacheEntry<K, V>((K) entry.getKey(), (V) entry.getValue());
if (accessTimeout == 0) {
remove();
} else if (accessTimeout != -1) {
write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey().getObj()));
write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey()));
}
return je;
}
@Override
protected void remove(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
JCache.this.remove((K) entry.getKey().getObj());
protected void remove(Map.Entry<Object, Object> entry) {
JCache.this.remove((K) entry.getKey());
}
@Override
protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
}

@ -18,7 +18,6 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
/**
*
@ -29,7 +28,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
interface MapReactive<K, V> {
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos);
Publisher<MapScanResult<Object, Object>> scanIteratorReactive(RedisClient client, long startPos);
Publisher<V> put(K key, V value);

@ -31,7 +31,6 @@ import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -202,10 +201,10 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
}
@Override
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>>() {
public Publisher<MapScanResult<Object, Object>> scanIteratorReactive(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<MapScanResult<Object, Object>>>() {
@Override
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> get() {
public RFuture<MapScanResult<Object, Object>> get() {
return ((RedissonMapCache<K, V>)mapCache).scanIteratorAsync(getName(), client, startPos, null);
}
});
@ -330,8 +329,8 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
public Publisher<V> valueIterator() {
return new RedissonMapReactiveIterator<K, V, V>(this) {
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
}.stream();
}
@ -340,8 +339,8 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
public Publisher<K> keyIterator() {
return new RedissonMapReactiveIterator<K, V, K>(this) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
}.stream();
}

@ -28,10 +28,8 @@ import org.redisson.api.RMapAsync;
import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
@ -292,8 +290,8 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
});
}
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
public Publisher<MapScanResult<Object, Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos);
}
@Override
@ -305,8 +303,8 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public Publisher<V> valueIterator() {
return new RedissonMapReactiveIterator<K, V, V>(this) {
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
}.stream();
}
@ -315,8 +313,8 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public Publisher<K> keyIterator() {
return new RedissonMapReactiveIterator<K, V, K>(this) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
}.stream();
}

@ -23,7 +23,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -64,7 +63,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
protected void nextValues() {
final ReactiveSubscription<M> m = this;
map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() {
map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<MapScanResult<Object, Object>>() {
@Override
public void onSubscribe(Subscription s) {
@ -72,7 +71,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
}
@Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
public void onNext(MapScanResult<Object, Object> res) {
if (currentIndex == 0) {
client = null;
nextIterPos = 0;
@ -82,7 +81,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) {
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
M val = getValue(entry);
m.onNext(val);
currentIndex--;
@ -119,12 +118,12 @@ public class RedissonMapReactiveIterator<K, V, M> {
}
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
M getValue(final Entry<Object, Object> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey(), (V)entry.getValue()) {
@Override
public V setValue(V value) {
Publisher<V> publisher = map.put((K) entry.getKey().getObj(), value);
Publisher<V> publisher = map.put((K) entry.getKey(), value);
return ((Stream<V>)publisher).next().poll();
}

@ -26,11 +26,9 @@ import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
@ -183,15 +181,15 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -32,7 +32,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -102,10 +101,10 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
});
}
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
Publisher<ListScanResult<Object>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<Object>>>() {
@Override
public RFuture<ListScanResult<ScanObjectEntry>> get() {
public RFuture<ListScanResult<Object>> get() {
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null);
}
});
@ -115,7 +114,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
};

@ -28,10 +28,8 @@ import org.redisson.api.RSetAsync;
import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
@ -111,8 +109,8 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -256,7 +254,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -20,7 +20,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -49,7 +48,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<Object>>() {
@Override
public void onSubscribe(Subscription s) {
@ -57,7 +56,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
@Override
public void onNext(ListScanResult<ScanObjectEntry> res) {
public void onNext(ListScanResult<Object> res) {
if (finished) {
client = null;
nextIterPos = 0;
@ -67,8 +66,8 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
client = res.getRedisClient();
nextIterPos = res.getPos();
for (ScanObjectEntry val : res.getValues()) {
m.onNext((V)val.getObj());
for (Object val : res.getValues()) {
m.onNext((V)val);
}
if (res.getPos() == 0) {
@ -94,6 +93,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
});
}
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos);
protected abstract Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos);
}

@ -38,7 +38,6 @@ import org.redisson.api.RMap;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -116,15 +115,6 @@ public class BaseTransactionalMap<K, V> {
}
}
private HashValue toValueHash(Object value) {
ByteBuf keyState = ((RedissonObject)map).encodeMapValue(value);
try {
return new HashValue(Hash.hash128(keyState));
} finally {
keyState.release();
}
}
public RFuture<Boolean> isExistsAsync() {
if (deleted != null) {
return RedissonPromise.newSucceededFuture(!deleted);
@ -202,21 +192,20 @@ public class BaseTransactionalMap<K, V> {
return result;
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
protected MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = ((RedissonMap<?, ?>)map).scanIterator(name, client, startPos, pattern);
MapScanResult<Object, Object> res = ((RedissonMap<?, ?>)map).scanIterator(name, client, startPos, pattern);
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
for (Iterator<ScanObjectEntry> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
ScanObjectEntry entry = iterator.next();
MapEntry mapEntry = newstate.remove(entry.getHash());
for (Iterator<Object> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
MapEntry mapEntry = newstate.remove(toKeyHash(entry));
if (mapEntry != null) {
if (mapEntry == MapEntry.NULL) {
iterator.remove();
continue;
}
HashValue valueHash = toValueHash(mapEntry.getValue());
res.getMap().put(entry, new ScanObjectEntry(valueHash, mapEntry.getValue()));
res.getMap().put(entry, mapEntry.getValue());
}
}
@ -226,9 +215,7 @@ public class BaseTransactionalMap<K, V> {
continue;
}
ScanObjectEntry key = new ScanObjectEntry(entry.getKey(), entry.getValue().getKey());
ScanObjectEntry value = new ScanObjectEntry(toValueHash(entry.getValue().getValue()), entry.getValue().getValue());
res.getMap().put(key, value);
res.getMap().put(entry.getValue().getKey(), entry.getValue().getValue());
}
}

@ -36,7 +36,6 @@ import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -176,16 +175,16 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.containsAsync(value);
}
protected abstract ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client,
protected abstract ListScanResult<Object> scanIteratorSource(String name, RedisClient client,
long startPos, String pattern);
protected ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client,
protected ListScanResult<Object> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
ListScanResult<ScanObjectEntry> res = scanIteratorSource(name, client, startPos, pattern);
ListScanResult<Object> res = scanIteratorSource(name, client, startPos, pattern);
Map<HashValue, Object> newstate = new HashMap<HashValue, Object>(state);
for (Iterator<ScanObjectEntry> iterator = res.getValues().iterator(); iterator.hasNext();) {
ScanObjectEntry entry = iterator.next();
Object value = newstate.remove(entry.getHash());
for (Iterator<Object> iterator = res.getValues().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
Object value = newstate.remove(toHash(entry));
if (value == NULL) {
iterator.remove();
}
@ -196,7 +195,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
if (entry.getValue() == NULL) {
continue;
}
res.getValues().add(new ScanObjectEntry(entry.getKey(), entry.getValue()));
res.getValues().add(entry.getValue());
}
}

@ -32,7 +32,6 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -107,7 +106,7 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern);

@ -31,7 +31,6 @@ import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -156,7 +155,7 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
}
@Override
public MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client,
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern);

@ -29,7 +29,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -96,7 +95,7 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern);
}

@ -28,7 +28,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -96,7 +95,7 @@ public class RedissonTransactionalSetCache<V> extends RedissonSetCache<V> {
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern);
}

@ -24,7 +24,6 @@ import org.redisson.api.RLock;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddOperation;
@ -48,7 +47,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client, long startPos,
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern) {
return ((RedissonSet<?>)set).scanIterator(name, client, startPos, pattern);
}

@ -25,7 +25,6 @@ import org.redisson.api.RLock;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddCacheOperation;
@ -49,7 +48,7 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<ScanObjectEntry> scanIteratorSource(String name, RedisClient client, long startPos,
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern) {
return ((RedissonSetCache<?>)set).scanIterator(name, client, startPos, pattern);
}

@ -7,11 +7,12 @@ import static org.redisson.BaseTest.createInstance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -32,8 +33,8 @@ import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
import org.redisson.api.NodeType;
import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodeType;
import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
@ -46,8 +47,6 @@ import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec;
@ -57,7 +56,6 @@ import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
public class RedissonTest {
@ -113,67 +111,6 @@ public class RedissonTest {
localRedisson.shutdown();
}
@Test
public void testIteratorNotLooped() {
RedissonBaseIterator iter = new RedissonBaseIterator() {
int i;
@Override
protected ListScanResult iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult(13L, Collections.emptyList());
}
if (i == 2) {
return new ListScanResult(0L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
protected void remove(Object value) {
}
};
Assert.assertFalse(iter.hasNext());
}
@Test
public void testIteratorNotLooped2() {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i;
@Override
protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1)));
}
if (i == 2) {
return new ListScanResult(7L, Collections.emptyList());
}
if (i == 3) {
return new ListScanResult(0L, Collections.emptyList());
}
if (i == 4) {
return new ListScanResult(14L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
protected void remove(ScanObjectEntry value) {
}
};
Assert.assertTrue(iter.hasNext());
assertThat(iter.next()).isEqualTo(1);
Assert.assertFalse(iter.hasNext());
}
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
@ -447,6 +384,121 @@ public class RedissonTest {
slave2.stop();
}
// @Test
public void testFailoverInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1, slave4)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 2000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
latch.countDown();
};
};
t.start();
t.join(1000);
Set<InetSocketAddress> addresses = new HashSet<>();
Collection<ClusterNode> masterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : masterNodes) {
addresses.add(clusterNode.getAddr());
}
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(80));
RedisProcess newMaster = null;
Collection<ClusterNode> newMasterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : newMasterNodes) {
if (!addresses.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
}
System.out.println("new-master: " + clusterNode.getAddr());
}
Thread.sleep(50000);
newMaster.stop();
System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
Thread.sleep(60000);
latch.await();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
if (!rFuture.isSuccess()) {
errors++;
} else {
success++;
}
}
System.out.println("errors " + errors + " success " + success);
for (RFuture<?> rFuture : futures) {
if (rFuture.isSuccess()) {
System.out.println(rFuture.isSuccess());
} else {
rFuture.cause().printStackTrace();
}
}
assertThat(readonlyErrors).isZero();
redisson.shutdown();
process.shutdown();
}
@Test
public void testReconnection() throws IOException, InterruptedException, TimeoutException {

@ -7,11 +7,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -114,22 +111,22 @@ public class RedissonTopicTest {
config.useSingleServer().setPingConnectionInterval(50);
RedissonClient redisson = Redisson.create(config);
Set<String> sentItems = new HashSet<>();
Set<String> receivedItems = new HashSet<>();
int count = 1000;
CountDownLatch latch = new CountDownLatch(count);
RTopic<String> eventsTopic = redisson.getTopic("eventsTopic");
eventsTopic.addListener((channel, msg) -> receivedItems.add(msg));
eventsTopic.addListener((channel, msg) -> {
latch.countDown();
});
for(int i = 0; i<1000; i++){
for(int i = 0; i<count; i++){
final String message = UUID.randomUUID().toString();
eventsTopic.publish(message);
sentItems.add(message);
Thread.sleep(10);
}
Thread.sleep(2000);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(sentItems).hasSameSizeAs(receivedItems);
redisson.shutdown();
}

@ -143,7 +143,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?"));
executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?"));
Thread.sleep(30000);
Thread.sleep(30200);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(6);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(30);
}

Loading…
Cancel
Save