From 7086b16ebf91d0823d94dcfc971b546ac76b4a05 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 25 Jun 2018 22:38:58 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonBaseIterator.java | 8 +- .../org/redisson/RedissonBaseMapIterator.java | 10 +- .../main/java/org/redisson/RedissonKeys.java | 14 +- .../main/java/org/redisson/RedissonMap.java | 20 +- .../java/org/redisson/RedissonMapCache.java | 18 +- .../org/redisson/RedissonMapIterator.java | 11 +- .../redisson/RedissonMultiMapIterator.java | 11 +- .../RedissonMultiMapKeysIterator.java | 11 +- .../java/org/redisson/RedissonMultimap.java | 11 +- .../org/redisson/RedissonScoredSortedSet.java | 14 +- .../main/java/org/redisson/RedissonSet.java | 16 +- .../java/org/redisson/RedissonSetCache.java | 16 +- .../redisson/RedissonSetMultimapValues.java | 12 +- .../main/java/org/redisson/ScanIterator.java | 5 +- .../redisson/client/codec/MapScanCodec.java | 110 ----------- .../org/redisson/client/codec/ScanCodec.java | 86 -------- .../client/handler/PingConnectionHandler.java | 1 - .../protocol/decoder/ScanObjectEntry.java | 43 ---- .../redisson/command/CommandAsyncService.java | 11 +- .../main/java/org/redisson/jcache/JCache.java | 33 ++-- .../org/redisson/reactive/MapReactive.java | 3 +- .../reactive/RedissonMapCacheReactive.java | 15 +- .../reactive/RedissonMapReactive.java | 14 +- .../reactive/RedissonMapReactiveIterator.java | 13 +- .../RedissonScoredSortedSetReactive.java | 8 +- .../reactive/RedissonSetCacheReactive.java | 9 +- .../reactive/RedissonSetReactive.java | 8 +- .../reactive/SetReactiveIterator.java | 11 +- .../transaction/BaseTransactionalMap.java | 27 +-- .../transaction/BaseTransactionalSet.java | 15 +- .../transaction/RedissonTransactionalMap.java | 3 +- .../RedissonTransactionalMapCache.java | 3 +- .../transaction/RedissonTransactionalSet.java | 3 +- .../RedissonTransactionalSetCache.java | 3 +- .../transaction/TransactionalSet.java | 3 +- .../transaction/TransactionalSetCache.java | 3 +- .../test/java/org/redisson/RedissonTest.java | 186 +++++++++++------- .../java/org/redisson/RedissonTopicTest.java | 17 +- .../RedissonScheduledExecutorServiceTest.java | 2 +- 39 files changed, 277 insertions(+), 530 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java delete mode 100644 redisson/src/main/java/org/redisson/client/codec/ScanCodec.java delete mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index 64457d537..54751c1f9 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -15,19 +15,17 @@ */ package org.redisson; -import org.redisson.client.protocol.decoder.ScanObjectEntry; - /** * * @author Nikita Koksharov * * @param value type */ -abstract class RedissonBaseIterator extends BaseIterator { +abstract class RedissonBaseIterator extends BaseIterator { @Override - protected V getValue(ScanObjectEntry entry) { - return (V) entry.getObj(); + protected V getValue(Object entry) { + return (V) entry; } } diff --git a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java index 5b0d33003..a2f4ce1cf 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -19,19 +19,17 @@ import java.util.AbstractMap; import java.util.Map; import java.util.Map.Entry; -import org.redisson.client.protocol.decoder.ScanObjectEntry; - /** * * @author Nikita Koksharov * * @param value type */ -public abstract class RedissonBaseMapIterator extends BaseIterator> { +public abstract class RedissonBaseMapIterator extends BaseIterator> { @SuppressWarnings("unchecked") - protected V getValue(final Map.Entry entry) { - return (V)new AbstractMap.SimpleEntry(entry.getKey().getObj(), entry.getValue().getObj()) { + protected V getValue(final Map.Entry entry) { + return (V)new AbstractMap.SimpleEntry(entry.getKey(), entry.getValue()) { @Override public Object setValue(Object value) { @@ -41,6 +39,6 @@ public abstract class RedissonBaseMapIterator extends BaseIterator entry, Object value); + protected abstract Object put(Entry entry, Object value); } diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index f83ca4795..fa4db6bb5 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -32,12 +32,10 @@ import org.redisson.api.RObject; import org.redisson.api.RType; import org.redisson.client.RedisClient; import org.redisson.client.RedisException; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; @@ -113,12 +111,12 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null, count); } - private ListScanResult scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { + private ListScanResult scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); } - RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); return commandExecutor.get(f); } @@ -126,13 +124,13 @@ public class RedissonKeys implements RKeys { return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ListScanResult iterator(RedisClient client, long nextIterPos) { return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } @Override - protected void remove(ScanObjectEntry value) { - RedissonKeys.this.delete((String)value.getObj()); + protected void remove(Object value) { + RedissonKeys.this.delete((String)value); } }; diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 685e4d2d7..904e36193 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -42,14 +42,12 @@ import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.mapreduce.RedissonMapReduce; @@ -1014,14 +1012,14 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { if (pattern == null) { - RFuture> f - = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); + RFuture> f + = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos); return get(f); } - RFuture> f - = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos, "MATCH", pattern); + RFuture> f + = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern); return get(f); } @@ -1104,8 +1102,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { protected Iterator keyIterator(String pattern) { return new RedissonMapIterator(RedissonMap.this, pattern) { @Override - protected K getValue(java.util.Map.Entry entry) { - return (K) entry.getKey().getObj(); + protected K getValue(java.util.Map.Entry entry) { + return (K) entry.getKey(); } }; } @@ -1155,8 +1153,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { protected Iterator valueIterator(String pattern) { return new RedissonMapIterator(RedissonMap.this, pattern) { @Override - protected V getValue(java.util.Map.Entry entry) { - return (V) entry.getValue().getObj(); + protected V getValue(java.util.Map.Entry entry) { + return (V) entry.getValue(); } }; } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 6e2f4a19a..a75be49b5 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -39,7 +39,6 @@ import org.redisson.api.map.event.MapEntryListener; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -52,7 +51,6 @@ import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ObjectListDecoder; import org.redisson.client.protocol.decoder.ObjectMapDecoder; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.codec.MapCacheEventCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; @@ -1210,11 +1208,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { return get(scanIteratorAsync(name, client, startPos, pattern)); } - public RFuture> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) { + public RFuture> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(System.currentTimeMillis()); params.add(startPos); @@ -1223,8 +1221,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); - RFuture> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN, + new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); + RFuture> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN, "local result = {}; " + "local idleKeys = {}; " + "local res; " @@ -1264,12 +1262,12 @@ public class RedissonMapCache extends RedissonMap implements RMapCac Arrays.asList(name, getTimeoutSetName(name), getIdleSetName(name)), params.toArray()); - f.addListener(new FutureListener>() { + f.addListener(new FutureListener>() { @Override - public void operationComplete(Future> future) + public void operationComplete(Future> future) throws Exception { if (future.isSuccess()) { - MapCacheScanResult res = future.getNow(); + MapCacheScanResult res = future.getNow(); if (res.getIdleKeys().isEmpty()) { return; } @@ -1303,7 +1301,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } }); - return (RFuture>)(Object)f; + return (RFuture>)(Object)f; } diff --git a/redisson/src/main/java/org/redisson/RedissonMapIterator.java b/redisson/src/main/java/org/redisson/RedissonMapIterator.java index 6846260cc..be7748c14 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonMapIterator.java @@ -18,7 +18,6 @@ package org.redisson; import java.util.Map.Entry; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ScanObjectEntry; /** * @@ -37,18 +36,18 @@ public class RedissonMapIterator extends RedissonBaseMapIterator { } @Override - protected Object put(Entry entry, Object value) { - return map.put(entry.getKey().getObj(), value); + protected Object put(Entry entry, Object value) { + return map.put(entry.getKey(), value); } @Override - protected ScanResult> iterator(RedisClient client, long nextIterPos) { + protected ScanResult> iterator(RedisClient client, long nextIterPos) { return map.scanIterator(map.getName(), client, nextIterPos, pattern); } @Override - protected void remove(Entry value) { - map.fastRemove(value.getKey().getObj()); + protected void remove(Entry value) { + map.fastRemove(value.getKey()); } } diff --git a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java index 359ba8c5f..124fa742c 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java @@ -24,7 +24,6 @@ import java.util.NoSuchElementException; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -37,7 +36,7 @@ import org.redisson.command.CommandAsyncExecutor; */ abstract class RedissonMultiMapIterator implements Iterator { - private Iterator> keysIter; + private Iterator> keysIter; protected long keysIterPos = 0; private K currentKey; @@ -75,7 +74,7 @@ abstract class RedissonMultiMapIterator implements Iterator { while (true) { if (!keysFinished && (keysIter == null || !keysIter.hasNext())) { - MapScanResult res = map.scanIterator(client, keysIterPos); + MapScanResult res = map.scanIterator(client, keysIterPos); client = res.getRedisClient(); keysIter = res.getMap().entrySet().iterator(); keysIterPos = res.getPos(); @@ -86,9 +85,9 @@ abstract class RedissonMultiMapIterator implements Iterator { } while (keysIter.hasNext()) { - Entry e = keysIter.next(); - currentKey = (K) e.getKey().getObj(); - String name = e.getValue().getObj().toString(); + Entry e = keysIter.next(); + currentKey = (K) e.getKey(); + String name = e.getValue().toString(); valuesIter = getIterator(name); if (valuesIter.hasNext()) { return true; diff --git a/redisson/src/main/java/org/redisson/RedissonMultiMapKeysIterator.java b/redisson/src/main/java/org/redisson/RedissonMultiMapKeysIterator.java index 0c8ce6d2c..159e44948 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiMapKeysIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiMapKeysIterator.java @@ -18,7 +18,6 @@ package org.redisson; import java.util.Map.Entry; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ScanObjectEntry; /** * @@ -34,18 +33,18 @@ public class RedissonMultiMapKeysIterator extends RedissonBaseMapIterator this.map = map; } @Override - protected Object put(Entry entry, Object value) { - return map.put(entry.getKey().getObj(), value); + protected Object put(Entry entry, Object value) { + return map.put(entry.getKey(), value); } @Override - protected ScanResult> iterator(RedisClient client, long nextIterPos) { + protected ScanResult> iterator(RedisClient client, long nextIterPos) { return map.scanIterator(client, nextIterPos); } @Override - protected void remove(Entry value) { - map.fastRemove(value.getKey().getObj()); + protected void remove(Entry value) { + map.fastRemove(value.getKey()); } } diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index 4b2f20224..4ad509774 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -34,12 +34,11 @@ import org.redisson.api.RReadWriteLock; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandExecutor; import org.redisson.misc.Hash; @@ -301,8 +300,8 @@ public abstract class RedissonMultimap extends RedissonExpirable implement } - MapScanResult scanIterator(RedisClient client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); + MapScanResult scanIterator(RedisClient client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, getName(), new CompositeCodec(codec, StringCodec.INSTANCE, codec), RedisCommands.HSCAN, getName(), startPos); return get(f); } @@ -316,8 +315,8 @@ public abstract class RedissonMultimap extends RedissonExpirable implement public Iterator iterator() { return new RedissonMultiMapKeysIterator(RedissonMultimap.this) { @Override - protected K getValue(java.util.Map.Entry entry) { - return (K) entry.getKey().getObj(); + protected K getValue(java.util.Map.Entry entry) { + return (K) entry.getKey(); } }; } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 1911a0311..5bd33ee17 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -24,8 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSet; @@ -36,13 +36,11 @@ import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.RedissonPromise; @@ -394,8 +392,8 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o)); } - private ListScanResult scanIterator(RedisClient client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); + private ListScanResult scanIterator(RedisClient client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); return get(f); } @@ -404,13 +402,13 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(client, nextIterPos); } @Override - protected void remove(ScanObjectEntry value) { - RedissonScoredSortedSet.this.remove((V)value.getObj()); + protected void remove(Object value) { + RedissonScoredSortedSet.this.remove((V)value); } }; diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 8f692d4b0..9250630af 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -31,10 +31,8 @@ import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.Hash; @@ -94,13 +92,13 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); + RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos); return get(f); } - RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos, "MATCH", pattern); + RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern); return get(f); } @@ -109,13 +107,13 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos, pattern); } @Override - protected void remove(ScanObjectEntry value) { - RedissonSet.this.remove((V)value.getObj()); + protected void remove(Object value) { + RedissonSet.this.remove((V)value); } }; @@ -567,7 +565,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt } @Override - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { throw new UnsupportedOperationException(); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 6e04903fa..41e6f6355 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -31,10 +31,8 @@ import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.mapreduce.RedissonCollectionMapReduce; @@ -123,13 +121,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { - RFuture> f = scanIteratorAsync(name, client, startPos, pattern); + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + RFuture> f = scanIteratorAsync(name, client, startPos, pattern); return get(f); } @Override - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(startPos); params.add(System.currentTimeMillis()); @@ -137,7 +135,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< params.add(pattern); } - return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN, + return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN, "local result = {}; " + "local res; " + "if (#ARGV == 3) then " @@ -161,13 +159,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos, pattern); } @Override - protected void remove(ScanObjectEntry value) { - RedissonSetCache.this.remove((V)value.getObj()); + protected void remove(Object value) { + RedissonSetCache.this.remove((V)value); } }; diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 9cea3350c..ccf191d31 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -30,7 +30,6 @@ import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -39,7 +38,6 @@ import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -167,7 +165,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o)); } - private ListScanResult scanIterator(RedisClient client, long startPos, String pattern) { + private ListScanResult scanIterator(RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(System.currentTimeMillis()); params.add(startPos); @@ -176,7 +174,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R params.add(pattern); } - RFuture> f = commandExecutor.evalReadAsync(client, getName(), new MapScanCodec(codec), EVAL_SSCAN, + RFuture> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN, "local expireDate = 92233720368547758; " + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "if expireDateScore ~= false then " @@ -203,13 +201,13 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(client, nextIterPos, pattern); } @Override - protected void remove(ScanObjectEntry value) { - RedissonSetMultimapValues.this.remove((V)value.getObj()); + protected void remove(Object value) { + RedissonSetMultimapValues.this.remove((V)value); } }; diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java index 3531dda26..6dcb291a1 100644 --- a/redisson/src/main/java/org/redisson/ScanIterator.java +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -18,7 +18,6 @@ package org.redisson; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; /** * @@ -27,9 +26,9 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; */ public interface ScanIterator { - ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern); + ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern); - RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern); + RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern); boolean remove(Object value); diff --git a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java deleted file mode 100644 index 8ff933a05..000000000 --- a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.codec; - -import java.io.IOException; - -import org.redisson.client.handler.State; -import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; -import org.redisson.client.protocol.decoder.ScanObjectEntry; -import org.redisson.misc.Hash; -import org.redisson.misc.HashValue; - -import io.netty.buffer.ByteBuf; - -/** - * - * @author Nikita Koksharov - * - */ -public class MapScanCodec implements Codec { - - private final Codec delegate; - private final Codec mapValueCodec; - - public MapScanCodec(Codec delegate) { - this(delegate, null); - } - - public MapScanCodec(Codec delegate, Codec mapValueCodec) { - this.delegate = delegate; - this.mapValueCodec = mapValueCodec; - } - - @Override - public Decoder getValueDecoder() { - return delegate.getValueDecoder(); - } - - @Override - public Encoder getValueEncoder() { - return delegate.getValueEncoder(); - } - - @Override - public Decoder getMapValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - buf.markReaderIndex(); - long[] hash = Hash.hash128(buf); - buf.resetReaderIndex(); - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - Object val = c.getMapValueDecoder().decode(buf, state); - return new ScanObjectEntry(new HashValue(hash), val); - } - }; - } - - @Override - public Encoder getMapValueEncoder() { - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - - return c.getMapValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - buf.markReaderIndex(); - long[] hash = Hash.hash128(buf); - buf.resetReaderIndex(); - Object val = delegate.getMapKeyDecoder().decode(buf, state); - return new ScanObjectEntry(new HashValue(hash), val); - } - }; - } - - @Override - public Encoder getMapKeyEncoder() { - return delegate.getMapKeyEncoder(); - } - - @Override - public ClassLoader getClassLoader() { - return getClass().getClassLoader(); - } - -} diff --git a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java deleted file mode 100644 index f66f67eba..000000000 --- a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.codec; - -import java.io.IOException; - -import org.redisson.client.handler.State; -import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; -import org.redisson.client.protocol.decoder.ScanObjectEntry; -import org.redisson.misc.Hash; -import org.redisson.misc.HashValue; - -import io.netty.buffer.ByteBuf; - -/** - * - * @author Nikita Koksharov - * - */ -public class ScanCodec implements Codec { - - private final Codec delegate; - - public ScanCodec(Codec delegate) { - this.delegate = delegate; - } - - @Override - public Decoder getValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - buf.markReaderIndex(); - long[] hash = Hash.hash128(buf); - buf.resetReaderIndex(); - Object val = delegate.getValueDecoder().decode(buf, state); - return new ScanObjectEntry(new HashValue(hash), val); - } - }; - } - - @Override - public Encoder getValueEncoder() { - return delegate.getValueEncoder(); - } - - @Override - public Decoder getMapValueDecoder() { - return delegate.getMapValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return delegate.getMapValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return delegate.getMapKeyDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return delegate.getMapKeyEncoder(); - } - - @Override - public ClassLoader getClassLoader() { - return delegate.getClassLoader(); - } - -} diff --git a/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java b/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java index 1132d5bf9..64674248d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java +++ b/redisson/src/main/java/org/redisson/client/handler/PingConnectionHandler.java @@ -57,7 +57,6 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter { @Override public void run(Timeout timeout) throws Exception { if (future.cancel(false) || !future.isSuccess()) { - System.out.println("closed!!! " + future + " " + connection.getChannel()); ctx.channel().close(); } else { sendPing(ctx); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java deleted file mode 100644 index 687b77c9e..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.decoder; - -import org.redisson.misc.HashValue; - -/** - * - * @author Nikita Koksharov - * - */ -public class ScanObjectEntry { - - private final HashValue hash; - private final Object obj; - - public ScanObjectEntry(HashValue hash, Object obj) { - this.hash = hash; - this.obj = obj; - } - - public HashValue getHash() { - return hash; - } - - public Object getObj() { - return obj; - } - -} diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index a0404a3e5..778a157cb 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.redisson.ScanResult; import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; +import org.redisson.ScanResult; import org.redisson.SlotCallback; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -55,7 +55,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; @@ -1005,8 +1004,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { Map oldMap = ((MapScanResult) o).getMap(); Map map = tryHandleReference(oldMap); if (map != oldMap) { - MapScanResult newScanResult - = new MapScanResult(scanResult.getPos(), map); + MapScanResult newScanResult + = new MapScanResult(scanResult.getPos(), map); newScanResult.setRedisClient(scanResult.getRedisClient()); return (T) newScanResult; } else { @@ -1023,10 +1022,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { } else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) { ScoredEntry se = ((ScoredEntry) o); return (T) new ScoredEntry(se.getScore(), fromReference(se.getValue())); - } else if (o instanceof ScanObjectEntry) { - ScanObjectEntry keyScan = (ScanObjectEntry) o; - Object obj = tryHandleReference0(keyScan.getObj()); - return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getHash(), obj) : o; } else if (o instanceof Map.Entry) { Map.Entry old = (Map.Entry) o; Object key = tryHandleReference0(old.getKey()); diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 7a10046f0..da6a766e3 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -60,12 +60,10 @@ import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.jcache.JMutableEntry.Action; import org.redisson.jcache.configuration.JCacheConfiguration; @@ -73,7 +71,6 @@ import org.redisson.misc.Hash; import io.netty.buffer.ByteBuf; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * JCache implementation @@ -2084,9 +2081,9 @@ public class JCache extends RedissonObject implements Cache { cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime); } - MapScanResult scanIterator(String name, RedisClient client, long startPos) { - RFuture> f - = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); + MapScanResult scanIterator(String name, RedisClient client, long startPos) { + RFuture> f + = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos); try { return get(f); } catch (Exception e) { @@ -2097,22 +2094,22 @@ public class JCache extends RedissonObject implements Cache { protected Iterator keyIterator() { return new RedissonBaseMapIterator() { @Override - protected K getValue(Map.Entry entry) { - return (K) entry.getKey().getObj(); + protected K getValue(Map.Entry entry) { + return (K) entry.getKey(); } @Override - protected void remove(java.util.Map.Entry value) { + protected void remove(java.util.Map.Entry value) { throw new UnsupportedOperationException(); } @Override - protected Object put(java.util.Map.Entry entry, Object value) { + protected Object put(java.util.Map.Entry entry, Object value) { throw new UnsupportedOperationException(); } @Override - protected ScanResult> iterator(RedisClient client, + protected ScanResult> iterator(RedisClient client, long nextIterPos) { return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos); } @@ -2416,32 +2413,32 @@ public class JCache extends RedissonObject implements Cache { checkNotClosed(); return new RedissonBaseMapIterator>() { @Override - protected Cache.Entry getValue(Map.Entry entry) { + protected Cache.Entry getValue(Map.Entry entry) { cacheManager.getStatBean(JCache.this).addHits(1); Long accessTimeout = getAccessTimeout(); - JCacheEntry je = new JCacheEntry((K) entry.getKey().getObj(), (V) entry.getValue().getObj()); + JCacheEntry je = new JCacheEntry((K) entry.getKey(), (V) entry.getValue()); if (accessTimeout == 0) { remove(); } else if (accessTimeout != -1) { - write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey().getObj())); + write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, encodeMapKey(entry.getKey())); } return je; } @Override - protected void remove(Map.Entry entry) { - JCache.this.remove((K) entry.getKey().getObj()); + protected void remove(Map.Entry entry) { + JCache.this.remove((K) entry.getKey()); } @Override - protected Object put(java.util.Map.Entry entry, Object value) { + protected Object put(java.util.Map.Entry entry, Object value) { throw new UnsupportedOperationException(); } @Override - protected ScanResult> iterator(RedisClient client, + protected ScanResult> iterator(RedisClient client, long nextIterPos) { return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/reactive/MapReactive.java b/redisson/src/main/java/org/redisson/reactive/MapReactive.java index 5cf06e434..d436cf422 100644 --- a/redisson/src/main/java/org/redisson/reactive/MapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/MapReactive.java @@ -18,7 +18,6 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; /** * @@ -29,7 +28,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; */ interface MapReactive { - Publisher> scanIteratorReactive(RedisClient client, long startPos); + Publisher> scanIteratorReactive(RedisClient client, long startPos); Publisher put(K key, V value); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index a896d0ca4..44f3d299b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -31,7 +31,6 @@ import org.redisson.api.RMapReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import org.redisson.eviction.EvictionScheduler; @@ -202,10 +201,10 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im } @Override - public Publisher> scanIteratorReactive(final RedisClient client, final long startPos) { - return reactive(new Supplier>>() { + public Publisher> scanIteratorReactive(final RedisClient client, final long startPos) { + return reactive(new Supplier>>() { @Override - public RFuture> get() { + public RFuture> get() { return ((RedissonMapCache)mapCache).scanIteratorAsync(getName(), client, startPos, null); } }); @@ -330,8 +329,8 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im public Publisher valueIterator() { return new RedissonMapReactiveIterator(this) { @Override - V getValue(Entry entry) { - return (V) entry.getValue().getObj(); + V getValue(Entry entry) { + return (V) entry.getValue(); } }.stream(); } @@ -340,8 +339,8 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im public Publisher keyIterator() { return new RedissonMapReactiveIterator(this) { @Override - K getValue(Entry entry) { - return (K) entry.getKey().getObj(); + K getValue(Entry entry) { + return (K) entry.getKey(); } }.stream(); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 72ee38a5c..d5894d649 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -28,10 +28,8 @@ import org.redisson.api.RMapAsync; import org.redisson.api.RMapReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.BiFunction; @@ -292,8 +290,8 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme }); } - public Publisher> scanIteratorReactive(RedisClient client, long startPos) { - return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); + public Publisher> scanIteratorReactive(RedisClient client, long startPos) { + return commandExecutor.readReactive(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); } @Override @@ -305,8 +303,8 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme public Publisher valueIterator() { return new RedissonMapReactiveIterator(this) { @Override - V getValue(Entry entry) { - return (V) entry.getValue().getObj(); + V getValue(Entry entry) { + return (V) entry.getValue(); } }.stream(); } @@ -315,8 +313,8 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme public Publisher keyIterator() { return new RedissonMapReactiveIterator(this) { @Override - K getValue(Entry entry) { - return (K) entry.getKey().getObj(); + K getValue(Entry entry) { + return (K) entry.getKey(); } }.stream(); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java index 6aa4571c7..631c15636 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java @@ -23,7 +23,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -64,7 +63,7 @@ public class RedissonMapReactiveIterator { protected void nextValues() { final ReactiveSubscription m = this; - map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { @@ -72,7 +71,7 @@ public class RedissonMapReactiveIterator { } @Override - public void onNext(MapScanResult res) { + public void onNext(MapScanResult res) { if (currentIndex == 0) { client = null; nextIterPos = 0; @@ -82,7 +81,7 @@ public class RedissonMapReactiveIterator { client = res.getRedisClient(); nextIterPos = res.getPos(); - for (Entry entry : res.getMap().entrySet()) { + for (Entry entry : res.getMap().entrySet()) { M val = getValue(entry); m.onNext(val); currentIndex--; @@ -119,12 +118,12 @@ public class RedissonMapReactiveIterator { } - M getValue(final Entry entry) { - return (M)new AbstractMap.SimpleEntry((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) { + M getValue(final Entry entry) { + return (M)new AbstractMap.SimpleEntry((K)entry.getKey(), (V)entry.getValue()) { @Override public V setValue(V value) { - Publisher publisher = map.put((K) entry.getKey().getObj(), value); + Publisher publisher = map.put((K) entry.getKey(), value); return ((Stream)publisher).next().poll(); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index aa54a14cc..8f7311327 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -26,11 +26,9 @@ import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; @@ -183,15 +181,15 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv }); } - private Publisher> scanIteratorReactive(RedisClient client, long startPos) { - return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(RedisClient client, long startPos) { + return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); } @Override public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 5eb9d2e39..8067eec81 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -32,7 +32,6 @@ import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import org.redisson.eviction.EvictionScheduler; @@ -102,10 +101,10 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple }); } - Publisher> scanIterator(final RedisClient client, final long startPos) { - return reactive(new Supplier>>() { + Publisher> scanIterator(final RedisClient client, final long startPos) { + return reactive(new Supplier>>() { @Override - public RFuture> get() { + public RFuture> get() { return ((ScanIterator)instance).scanIteratorAsync(getName(), client, startPos, null); } }); @@ -115,7 +114,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 5f44b6a31..b0cf7f496 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -28,10 +28,8 @@ import org.redisson.api.RSetAsync; import org.redisson.api.RSetReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; @@ -111,8 +109,8 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements }); } - private Publisher> scanIteratorReactive(RedisClient client, long startPos) { - return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(RedisClient client, long startPos) { + return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); } @Override @@ -256,7 +254,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index 219ed111f..3694b7ff1 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -20,7 +20,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -49,7 +48,7 @@ public abstract class SetReactiveIterator extends Stream { protected void nextValues() { final ReactiveSubscription m = this; - scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { @@ -57,7 +56,7 @@ public abstract class SetReactiveIterator extends Stream { } @Override - public void onNext(ListScanResult res) { + public void onNext(ListScanResult res) { if (finished) { client = null; nextIterPos = 0; @@ -67,8 +66,8 @@ public abstract class SetReactiveIterator extends Stream { client = res.getRedisClient(); nextIterPos = res.getPos(); - for (ScanObjectEntry val : res.getValues()) { - m.onNext((V)val.getObj()); + for (Object val : res.getValues()) { + m.onNext((V)val); } if (res.getPos() == 0) { @@ -94,6 +93,6 @@ public abstract class SetReactiveIterator extends Stream { }); } - protected abstract Publisher> scanIteratorReactive(RedisClient client, long nextIterPos); + protected abstract Publisher> scanIteratorReactive(RedisClient client, long nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index 2d82ec049..b7ff246dd 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -38,7 +38,6 @@ import org.redisson.api.RMap; import org.redisson.client.RedisClient; import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; @@ -116,15 +115,6 @@ public class BaseTransactionalMap { } } - private HashValue toValueHash(Object value) { - ByteBuf keyState = ((RedissonObject)map).encodeMapValue(value); - try { - return new HashValue(Hash.hash128(keyState)); - } finally { - keyState.release(); - } - } - public RFuture isExistsAsync() { if (deleted != null) { return RedissonPromise.newSucceededFuture(!deleted); @@ -202,21 +192,20 @@ public class BaseTransactionalMap { return result; } - protected MapScanResult scanIterator(String name, RedisClient client, + protected MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { - MapScanResult res = ((RedissonMap)map).scanIterator(name, client, startPos, pattern); + MapScanResult res = ((RedissonMap)map).scanIterator(name, client, startPos, pattern); Map newstate = new HashMap(state); - for (Iterator iterator = res.getMap().keySet().iterator(); iterator.hasNext();) { - ScanObjectEntry entry = iterator.next(); - MapEntry mapEntry = newstate.remove(entry.getHash()); + for (Iterator iterator = res.getMap().keySet().iterator(); iterator.hasNext();) { + Object entry = iterator.next(); + MapEntry mapEntry = newstate.remove(toKeyHash(entry)); if (mapEntry != null) { if (mapEntry == MapEntry.NULL) { iterator.remove(); continue; } - HashValue valueHash = toValueHash(mapEntry.getValue()); - res.getMap().put(entry, new ScanObjectEntry(valueHash, mapEntry.getValue())); + res.getMap().put(entry, mapEntry.getValue()); } } @@ -226,9 +215,7 @@ public class BaseTransactionalMap { continue; } - ScanObjectEntry key = new ScanObjectEntry(entry.getKey(), entry.getValue().getKey()); - ScanObjectEntry value = new ScanObjectEntry(toValueHash(entry.getValue().getValue()), entry.getValue().getValue()); - res.getMap().put(key, value); + res.getMap().put(entry.getValue().getKey(), entry.getValue().getValue()); } } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index 0696f4ab6..35b7734f7 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -36,7 +36,6 @@ import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; @@ -176,16 +175,16 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return set.containsAsync(value); } - protected abstract ListScanResult scanIteratorSource(String name, RedisClient client, + protected abstract ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, String pattern); - protected ListScanResult scanIterator(String name, RedisClient client, + protected ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { - ListScanResult res = scanIteratorSource(name, client, startPos, pattern); + ListScanResult res = scanIteratorSource(name, client, startPos, pattern); Map newstate = new HashMap(state); - for (Iterator iterator = res.getValues().iterator(); iterator.hasNext();) { - ScanObjectEntry entry = iterator.next(); - Object value = newstate.remove(entry.getHash()); + for (Iterator iterator = res.getValues().iterator(); iterator.hasNext();) { + Object entry = iterator.next(); + Object value = newstate.remove(toHash(entry)); if (value == NULL) { iterator.remove(); } @@ -196,7 +195,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { if (entry.getValue() == NULL) { continue; } - res.getValues().add(new ScanObjectEntry(entry.getKey(), entry.getValue())); + res.getValues().add(entry.getValue()); } } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index b96ec366e..7bdc9fb46 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -32,7 +32,6 @@ import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; @@ -107,7 +106,7 @@ public class RedissonTransactionalMap extends RedissonMap { } @Override - public MapScanResult scanIterator(String name, RedisClient client, + public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { checkState(); return transactionalMap.scanIterator(name, client, startPos, pattern); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 67046801d..8cda78216 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -31,7 +31,6 @@ import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; @@ -156,7 +155,7 @@ public class RedissonTransactionalMapCache extends RedissonMapCache } @Override - public MapScanResult scanIterator(String name, RedisClient client, + public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { checkState(); return transactionalMap.scanIterator(name, client, startPos, pattern); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java index d9554f2ef..9bae8a507 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java @@ -29,7 +29,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; @@ -96,7 +95,7 @@ public class RedissonTransactionalSet extends RedissonSet { } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java index 2d5ae9737..de801a9de 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java @@ -28,7 +28,6 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; @@ -96,7 +95,7 @@ public class RedissonTransactionalSetCache extends RedissonSetCache { } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index 4fbc1d4c4..502e0cf26 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -24,7 +24,6 @@ import org.redisson.api.RLock; import org.redisson.api.RSet; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.set.AddOperation; @@ -48,7 +47,7 @@ public class TransactionalSet extends BaseTransactionalSet { } @Override - protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, + protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, String pattern) { return ((RedissonSet)set).scanIterator(name, client, startPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index 0a7b65e55..6dbbb17aa 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -25,7 +25,6 @@ import org.redisson.api.RLock; import org.redisson.api.RSetCache; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.set.AddCacheOperation; @@ -49,7 +48,7 @@ public class TransactionalSetCache extends BaseTransactionalSet { } @Override - protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, + protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, String pattern) { return ((RedissonSetCache)set).scanIterator(name, client, startPos, pattern); } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 6960659b3..a5f3520eb 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -7,11 +7,12 @@ import static org.redisson.BaseTest.createInstance; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,8 +33,8 @@ import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.ClusterNode; import org.redisson.api.Node; -import org.redisson.api.NodeType; import org.redisson.api.Node.InfoSection; +import org.redisson.api.NodeType; import org.redisson.api.NodesGroup; import org.redisson.api.RFuture; import org.redisson.api.RMap; @@ -46,8 +47,6 @@ import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; -import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.codec.JsonJacksonCodec; @@ -57,7 +56,6 @@ import org.redisson.connection.CRC16; import org.redisson.connection.ConnectionListener; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; -import org.redisson.misc.HashValue; public class RedissonTest { @@ -113,67 +111,6 @@ public class RedissonTest { localRedisson.shutdown(); } - @Test - public void testIteratorNotLooped() { - RedissonBaseIterator iter = new RedissonBaseIterator() { - int i; - @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { - i++; - if (i == 1) { - return new ListScanResult(13L, Collections.emptyList()); - } - if (i == 2) { - return new ListScanResult(0L, Collections.emptyList()); - } - Assert.fail(); - return null; - } - - @Override - protected void remove(Object value) { - } - - }; - - Assert.assertFalse(iter.hasNext()); - } - - @Test - public void testIteratorNotLooped2() { - RedissonBaseIterator iter = new RedissonBaseIterator() { - int i; - @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { - i++; - if (i == 1) { - return new ListScanResult(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1))); - } - if (i == 2) { - return new ListScanResult(7L, Collections.emptyList()); - } - if (i == 3) { - return new ListScanResult(0L, Collections.emptyList()); - } - if (i == 4) { - return new ListScanResult(14L, Collections.emptyList()); - } - Assert.fail(); - return null; - } - - @Override - protected void remove(ScanObjectEntry value) { - } - - }; - - Assert.assertTrue(iter.hasNext()); - assertThat(iter.next()).isEqualTo(1); - Assert.assertFalse(iter.hasNext()); - } - - @BeforeClass public static void beforeClass() throws IOException, InterruptedException { if (!RedissonRuntimeEnvironment.isTravis) { @@ -447,6 +384,121 @@ public class RedissonTest { slave2.stop(); } +// @Test + public void testFailoverInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); + RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); + RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave(); + RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1, slave4) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); + + List> futures = new ArrayList>(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 2000; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + RFuture f2 = redisson.getBucket("i" + i).setAsync(""); + RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + futures.add(f1); + futures.add(f2); + futures.add(f3); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (i % 100 == 0) { + System.out.println("step: " + i); + } + } + latch.countDown(); + }; + }; + t.start(); + t.join(1000); + + Set addresses = new HashSet<>(); + Collection masterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER); + for (ClusterNode clusterNode : masterNodes) { + addresses.add(clusterNode.getAddr()); + } + + master.stop(); + System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!"); + + Thread.sleep(TimeUnit.SECONDS.toMillis(80)); + + RedisProcess newMaster = null; + Collection newMasterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER); + for (ClusterNode clusterNode : newMasterNodes) { + if (!addresses.contains(clusterNode.getAddr())) { + newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get(); + break; + } + System.out.println("new-master: " + clusterNode.getAddr()); + } + + Thread.sleep(50000); + + newMaster.stop(); + + System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!"); + + Thread.sleep(TimeUnit.SECONDS.toMillis(70)); + + Thread.sleep(60000); + + latch.await(); + + int errors = 0; + int success = 0; + int readonlyErrors = 0; + + for (RFuture rFuture : futures) { + rFuture.awaitUninterruptibly(); + if (!rFuture.isSuccess()) { + errors++; + } else { + success++; + } + } + + System.out.println("errors " + errors + " success " + success); + + for (RFuture rFuture : futures) { + if (rFuture.isSuccess()) { + System.out.println(rFuture.isSuccess()); + } else { + rFuture.cause().printStackTrace(); + } + } + + assertThat(readonlyErrors).isZero(); + + redisson.shutdown(); + process.shutdown(); + } + @Test public void testReconnection() throws IOException, InterruptedException, TimeoutException { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 21d1161b3..eff9e0ec8 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -7,11 +7,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -114,22 +111,22 @@ public class RedissonTopicTest { config.useSingleServer().setPingConnectionInterval(50); RedissonClient redisson = Redisson.create(config); - Set sentItems = new HashSet<>(); - Set receivedItems = new HashSet<>(); + int count = 1000; + CountDownLatch latch = new CountDownLatch(count); RTopic eventsTopic = redisson.getTopic("eventsTopic"); - eventsTopic.addListener((channel, msg) -> receivedItems.add(msg)); + eventsTopic.addListener((channel, msg) -> { + latch.countDown(); + }); - for(int i = 0; i<1000; i++){ + for(int i = 0; i