diff --git a/src/main/java/org/redisson/RedissonBaseMapIterator.java b/src/main/java/org/redisson/RedissonBaseMapIterator.java index 66cf04465..d493efb6d 100644 --- a/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -64,7 +64,7 @@ abstract class RedissonBaseMapIterator implements Iterator { firstValues = convert(res.getMap()); } else { Map newValues = convert(res.getMap()); - if (newValues.equals(firstValues)) { + if (firstValues.entrySet().containsAll(newValues.entrySet())) { finished = true; free(firstValues); free(newValues); diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 8f3eeef1f..5a4d660a6 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -53,7 +53,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown public void await() throws InterruptedException { Future promise = subscribe(); try { - promise.await(); + get(promise); while (getCount() > 0) { // waiting for open state @@ -71,7 +71,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown public boolean await(long time, TimeUnit unit) throws InterruptedException { Future promise = subscribe(); try { - if (!promise.await(time, unit)) { + if (!await(promise, time, unit)) { return false; } diff --git a/src/main/java/org/redisson/RedissonGeo.java b/src/main/java/org/redisson/RedissonGeo.java index 7a62e165c..527892d25 100644 --- a/src/main/java/org/redisson/RedissonGeo.java +++ b/src/main/java/org/redisson/RedissonGeo.java @@ -45,12 +45,19 @@ import io.netty.util.concurrent.Future; public class RedissonGeo extends RedissonExpirable implements RGeo { + MultiDecoder> postitionDecoder; + MultiDecoder> distanceDecoder; + public RedissonGeo(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); + postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true); + distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true); } public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); + postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true); + distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true); } @Override @@ -119,7 +126,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { params.add(getName()); params.addAll(Arrays.asList(members)); - MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params)); + MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params), true); RedisCommand> command = new RedisCommand>("GEOPOS", decoder, 2, ValueType.OBJECTS); return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray()); } @@ -141,8 +148,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { @Override public Future> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) { - MultiDecoder> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); - RedisCommand> command = new RedisCommand>("GEORADIUS", decoder); + RedisCommand> command = new RedisCommand>("GEORADIUS", distanceDecoder); return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHDIST"); } @@ -153,8 +159,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { @Override public Future> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) { - MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); - RedisCommand> command = new RedisCommand>("GEORADIUS", decoder); + RedisCommand> command = new RedisCommand>("GEORADIUS", postitionDecoder); return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD"); } @@ -175,8 +180,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { @Override public Future> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit) { - MultiDecoder> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); - RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", decoder, 2); + RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", distanceDecoder, 2); return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHDIST"); } @@ -187,8 +191,7 @@ public class RedissonGeo extends RedissonExpirable implements RGeo { @Override public Future> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit) { - MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); - RedisCommand> command = new RedisCommand>("GEORADIUSBYMEMBER", decoder, 2); + RedisCommand> command = new RedisCommand>("GEORADIUSBYMEMBER", postitionDecoder, 2); return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHCOORD"); } } diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 91c1132da..8b2d33039 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -143,13 +143,13 @@ public class RedissonList extends RedissonExpirable implements RList { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local items = redis.call('lrange', KEYS[1], 0, -1) " + "for i=1, #items do " + - "for j = 0, table.getn(ARGV), 1 do " + + "for j = 1, #ARGV, 1 do " + "if items[i] == ARGV[j] then " + "table.remove(ARGV, j) " + "end " + "end " + "end " + - "return table.getn(ARGV) == 0 and 1 or 0", + "return #ARGV == 0 and 1 or 0", Collections.singletonList(getName()), c.toArray()); } @@ -222,7 +222,7 @@ public class RedissonList extends RedissonExpirable implements RList { public Future removeAllAsync(Collection c) { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local v = 0 " + - "for i = 1, table.getn(ARGV), 1 do " + "for i = 1, #ARGV, 1 do " + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 " + "then v = 1 end " +"end " @@ -246,11 +246,10 @@ public class RedissonList extends RedissonExpirable implements RList { "local changed = 0 " + "local items = redis.call('lrange', KEYS[1], 0, -1) " + "local i = 1 " - + "local s = table.getn(items) " - + "while i <= s do " + + "while i <= #items do " + "local element = items[i] " + "local isInAgrs = false " - + "for j = 0, table.getn(ARGV), 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == element then " + "isInAgrs = true " + "break " @@ -390,7 +389,7 @@ public class RedissonList extends RedissonExpirable implements RList { "local key = KEYS[1] " + "local obj = ARGV[1] " + "local items = redis.call('lrange', key, 0, -1) " + - "for i = #items, 0, -1 do " + + "for i = #items, 1, -1 do " + "if items[i] == obj then " + "return i - 1 " + "end " + diff --git a/src/main/java/org/redisson/RedissonListMultimapValues.java b/src/main/java/org/redisson/RedissonListMultimapValues.java index 1c6ed6d5f..abffc6e61 100644 --- a/src/main/java/org/redisson/RedissonListMultimapValues.java +++ b/src/main/java/org/redisson/RedissonListMultimapValues.java @@ -199,13 +199,13 @@ public class RedissonListMultimapValues extends RedissonExpirable implements + "return 0;" + "end; " + "local items = redis.call('lrange', KEYS[2], 0, -1);" + - "for i = 0, #items, 1 do " + - "for j = 2, table.getn(ARGV), 1 do " + "for i = 1, #items, 1 do " + + "for j = 2, #ARGV, 1 do " + "if ARGV[j] == items[i] " + "then table.remove(ARGV, j) end " + "end; " + "end;" - + "return table.getn(ARGV) == 2 and 1 or 0; ", + + "return #ARGV == 2 and 1 or 0; ", Arrays.asList(timeoutSetName, getName()), args.toArray()); } @@ -340,7 +340,7 @@ public class RedissonListMultimapValues extends RedissonExpirable implements "local changed = 0; " + "local s = redis.call('lrange', KEYS[2], 0, -1); " - + "local i = 0; " + + "local i = 1; " + "while i <= #s do " + "local element = s[i]; " + "local isInAgrs = false; " @@ -508,7 +508,7 @@ public class RedissonListMultimapValues extends RedissonExpirable implements + "end; " + "local items = redis.call('lrange', KEYS[1], 0, -1) " + - "for i = #items, 0, -1 do " + + "for i = #items, 1, -1 do " + "if items[i] == ARGV[1] then " + "return i - 1 " + "end " + diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 3bc14bf9b..eaf060496 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -110,7 +110,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } Future future = subscribe(); - future.sync(); + get(future); try { while (true) { @@ -229,7 +229,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } Future future = subscribe(); - if (!future.await(time, TimeUnit.MILLISECONDS)) { + if (!await(future, time, TimeUnit.MILLISECONDS)) { future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index cb8c8b54a..e72d4b074 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -107,7 +107,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { public Future containsValueAsync(Object value) { return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local s = redis.call('hvals', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " + "for i = 1, #s, 1 do " + "if ARGV[1] == s[i] then " + "return 1 " + "end " diff --git a/src/main/java/org/redisson/RedissonObject.java b/src/main/java/org/redisson/RedissonObject.java index 7e12d8ac8..6cc3d6000 100644 --- a/src/main/java/org/redisson/RedissonObject.java +++ b/src/main/java/org/redisson/RedissonObject.java @@ -15,6 +15,8 @@ */ package org.redisson; +import java.util.concurrent.TimeUnit; + import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; @@ -45,6 +47,10 @@ abstract class RedissonObject implements RObject { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } + protected boolean await(Future future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { + return commandExecutor.await(future, timeout, timeoutUnit); + } + protected V get(Future future) { return commandExecutor.get(future); } diff --git a/src/main/java/org/redisson/RedissonScoredSortedSet.java b/src/main/java/org/redisson/RedissonScoredSortedSet.java index 188460143..01875ae2b 100644 --- a/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -280,13 +280,13 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc public Future containsAllAsync(Collection c) { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local s = redis.call('zrange', KEYS[1], 0, -1);" + - "for i = 0, table.getn(s), 1 do " + - "for j = 0, table.getn(ARGV), 1 do " + "for i = 1, #s, 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == s[i] " + "then table.remove(ARGV, j) end " + "end; " + "end;" - + "return table.getn(ARGV) == 0 and 1 or 0; ", + + "return #ARGV == 0 and 1 or 0; ", Collections.singletonList(getName()), c.toArray()); } @@ -316,11 +316,11 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local changed = 0 " + "local s = redis.call('zrange', KEYS[1], 0, -1) " - + "local i = 0 " - + "while i <= table.getn(s) do " + + "local i = 1 " + + "while i <= #s do " + "local element = s[i] " + "local isInAgrs = false " - + "for j = 0, table.getn(ARGV), 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == element then " + "isInAgrs = true " + "break " diff --git a/src/main/java/org/redisson/RedissonSemaphore.java b/src/main/java/org/redisson/RedissonSemaphore.java index e29a4e0a0..a23cd966b 100644 --- a/src/main/java/org/redisson/RedissonSemaphore.java +++ b/src/main/java/org/redisson/RedissonSemaphore.java @@ -70,7 +70,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { return; } - Future future = subscribe().sync(); + Future future = subscribe(); + get(future); try { while (true) { if (tryAcquire(permits)) { @@ -113,7 +114,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { long time = unit.toMillis(waitTime); Future future = subscribe(); - if (!future.await(time, TimeUnit.MILLISECONDS)) { + if (!await(future, time, TimeUnit.MILLISECONDS)) { return false; } diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index 06b854f95..d2e728195 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; import org.redisson.client.codec.Codec; @@ -168,13 +167,13 @@ public class RedissonSet extends RedissonExpirable implements RSet { public Future containsAllAsync(Collection c) { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local s = redis.call('smembers', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " + - "for j = 0, table.getn(ARGV), 1 do " + "for i = 1, #s, 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == s[i] " + "then table.remove(ARGV, j) end " + "end; " + "end;" - + "return table.getn(ARGV) == 0 and 1 or 0; ", + + "return #ARGV == 0 and 1 or 0; ", Collections.singletonList(getName()), c.toArray()); } @@ -205,11 +204,11 @@ public class RedissonSet extends RedissonExpirable implements RSet { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local changed = 0 " + "local s = redis.call('smembers', KEYS[1]) " - + "local i = 0 " - + "while i <= table.getn(s) do " + + "local i = 1 " + + "while i <= #s do " + "local element = s[i] " + "local isInAgrs = false " - + "for j = 0, table.getn(ARGV), 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == element then " + "isInAgrs = true " + "break " @@ -229,7 +228,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { public Future removeAllAsync(Collection c) { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local v = 0 " + - "for i = 1, table.getn(ARGV), 1 do " + "for i = 1, #ARGV, 1 do " + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 " + "then v = 1 end " +"end " diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java index 124fbfbc9..a4c1e13b6 100644 --- a/src/main/java/org/redisson/RedissonSetCache.java +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -336,14 +335,14 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< public Future containsAllAsync(Collection c) { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local s = redis.call('hvals', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " + - "for j = 0, table.getn(ARGV), 1 do " + "for i = 1, #s, 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == s[i] then " + "table.remove(ARGV, j) " + "end " + "end; " + "end;" - + "return table.getn(ARGV) == 0 and 1 or 0; ", + + "return #ARGV == 0 and 1 or 0; ", Collections.singletonList(getName()), c.toArray()); } diff --git a/src/main/java/org/redisson/RedissonSetMultimapValues.java b/src/main/java/org/redisson/RedissonSetMultimapValues.java index 36d0fefee..c3546e05e 100644 --- a/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -251,13 +251,13 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R + "return 0;" + "end; " + "local s = redis.call('smembers', KEYS[2]);" + - "for i = 0, table.getn(s), 1 do " + - "for j = 2, table.getn(ARGV), 1 do " + "for i = 1, #s, 1 do " + + "for j = 2, #ARGV, 1 do " + "if ARGV[j] == s[i] " + "then table.remove(ARGV, j) end " + "end; " + "end;" - + "return table.getn(ARGV) == 2 and 1 or 0; ", + + "return #ARGV == 2 and 1 or 0; ", Arrays.asList(timeoutSetName, getName()), args.toArray()); } @@ -307,11 +307,11 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R "local changed = 0 " + "local s = redis.call('smembers', KEYS[2]) " - + "local i = 0 " - + "while i <= table.getn(s) do " + + "local i = 1 " + + "while i <= #s do " + "local element = s[i] " + "local isInAgrs = false " - + "for j = 2, table.getn(ARGV), 1 do " + + "for j = 2, #ARGV, 1 do " + "if ARGV[j] == element then " + "isInAgrs = true " + "break " @@ -350,7 +350,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R + "end; " + "local v = 0 " + - "for i = 2, table.getn(ARGV), 1 do " + "for i = 2, #ARGV, 1 do " + "if redis.call('srem', KEYS[2], ARGV[i]) == 1 " + "then v = 1 end " +"end " diff --git a/src/main/java/org/redisson/RedissonSubList.java b/src/main/java/org/redisson/RedissonSubList.java index 3306d83c3..759200eef 100644 --- a/src/main/java/org/redisson/RedissonSubList.java +++ b/src/main/java/org/redisson/RedissonSubList.java @@ -104,13 +104,13 @@ public class RedissonSubList extends RedissonList implements RList { "local toIndex = table.remove(ARGV, 2);" + "local items = redis.call('lrange', KEYS[1], tonumber(fromIndex), tonumber(toIndex)) " + "for i=1, #items do " + - "for j = 0, #ARGV, 1 do " + + "for j = 1, #ARGV, 1 do " + "if items[i] == ARGV[j] then " + "table.remove(ARGV, j) " + "end " + "end " + "end " + - "return table.getn(ARGV) == 0 and 1 or 0", + "return #ARGV == 0 and 1 or 0", Collections.singletonList(getName()), params.toArray()); } @@ -203,11 +203,10 @@ public class RedissonSubList extends RedissonList implements RList { "local toIndex = table.remove(ARGV, 2);" + "local items = redis.call('lrange', KEYS[1], fromIndex, toIndex) " + "local i = 1 " - + "local s = table.getn(items) " - + "while i <= s do " + + "while i <= #items do " + "local element = items[i] " + "local isInAgrs = false " - + "for j = 0, table.getn(ARGV), 1 do " + + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == element then " + "isInAgrs = true " + "break " diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 4f6c49dc4..fc98647fd 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -15,6 +15,7 @@ */ package org.redisson.client; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; @@ -111,21 +112,34 @@ public class RedisConnection implements RedisCommands { return redisClient; } - public R await(Future cmd) { - // TODO change connectTimeout to timeout - if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { - Promise promise = (Promise)cmd; - RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); - promise.setFailure(ex); - throw ex; - } - if (!cmd.isSuccess()) { - if (cmd.cause() instanceof RedisException) { - throw (RedisException) cmd.cause(); + public R await(Future future) { + final CountDownLatch l = new CountDownLatch(1); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + l.countDown(); + } + }); + + try { + // TODO change connectTimeout to timeout + if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { + Promise promise = (Promise)future; + RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); + promise.setFailure(ex); + throw ex; + } + if (!future.isSuccess()) { + if (future.cause() instanceof RedisException) { + throw (RedisException) future.cause(); + } + throw new RedisException("Unexpected exception while processing command", future.cause()); } - throw new RedisException("Unexpected exception while processing command", cmd.cause()); + return future.getNow(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; } - return cmd.getNow(); } public T sync(RedisStrictCommand command, Object ... params) { diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index f0b5ce652..64113ad5d 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -173,7 +173,7 @@ public class CommandDecoder extends ReplayingDecoder { if (code == '+') { String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); - + handleResult(data, parts, result, false, channel); } else if (code == '-') { String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); @@ -206,9 +206,7 @@ public class CommandDecoder extends ReplayingDecoder { } } } else if (code == ':') { - String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); - in.skipBytes(2); - Object result = Long.valueOf(status); + Long result = readLong(in); handleResult(data, parts, result, false, channel); } else if (code == '$') { ByteBuf buf = readBytes(in); diff --git a/src/main/java/org/redisson/client/handler/CommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java index ff65b9d4c..96446c90e 100644 --- a/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -23,10 +23,10 @@ import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommandHolder; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.AttributeKey; import io.netty.util.internal.PlatformDependent; @@ -37,7 +37,7 @@ import io.netty.util.internal.PlatformDependent; * @author Nikita Koksharov * */ -public class CommandsQueue extends ChannelDuplexHandler { +public class CommandsQueue extends ChannelOutboundHandlerAdapter { public static final AttributeKey CURRENT_COMMAND = AttributeKey.valueOf("promise"); diff --git a/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java index 413694b33..4bd04c8ee 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java @@ -27,6 +27,10 @@ public class FlatNestedMultiDecoder extends NestedMultiDecoder { super(firstDecoder, secondDecoder); } + public FlatNestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, boolean handleEmpty) { + super(firstDecoder, secondDecoder, handleEmpty); + } + @Override public Object decode(ByteBuf buf, State state) throws IOException { return firstDecoder.decode(buf, state); diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java index c2a5f5fdd..43b5266ac 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java @@ -58,19 +58,27 @@ public class NestedMultiDecoder implements MultiDecoder { protected final MultiDecoder firstDecoder; protected final MultiDecoder secondDecoder; private MultiDecoder thirdDecoder; + private boolean handleEmpty; public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { - this.firstDecoder = firstDecoder; - this.secondDecoder = secondDecoder; + this(firstDecoder, secondDecoder, false); + } + + public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, boolean handleEmpty) { + this(firstDecoder, secondDecoder, null, handleEmpty); } public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, MultiDecoder thirdDecoder) { + this(firstDecoder, secondDecoder, thirdDecoder, false); + } + + public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, MultiDecoder thirdDecoder, boolean handleEmpty) { this.firstDecoder = firstDecoder; this.secondDecoder = secondDecoder; this.thirdDecoder = thirdDecoder; + this.handleEmpty = handleEmpty; } - @Override public Object decode(ByteBuf buf, State state) throws IOException { DecoderState ds = getDecoder(state); @@ -121,7 +129,7 @@ public class NestedMultiDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { - if (parts.isEmpty() && state.getDecoderState() == null) { + if (parts.isEmpty() && state.getDecoderState() == null && handleEmpty) { MultiDecoder decoder = secondDecoder; if (thirdDecoder != null) { decoder = thirdDecoder; diff --git a/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/src/main/java/org/redisson/command/CommandAsyncExecutor.java index cddeb2a8b..a218b8f19 100644 --- a/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -18,6 +18,7 @@ package org.redisson.command; import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import org.redisson.SlotCallback; import org.redisson.client.RedisException; @@ -38,6 +39,8 @@ public interface CommandAsyncExecutor { RedisException convertException(Future future); + boolean await(Future future, long timeout, TimeUnit timeoutUnit) throws InterruptedException; + V get(Future future); Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index ab0b9e18b..cf1a506a9 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -22,9 +22,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.RedisClientResult; import org.redisson.RedissonShutdownException; @@ -82,13 +84,38 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public V get(Future future) { - future.awaitUninterruptibly(); + final CountDownLatch l = new CountDownLatch(1); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + l.countDown(); + } + }); + try { + l.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // commented out due to blocking issues up to 200 ms per minute for each thread + // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); } + @Override + public boolean await(Future future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { + final CountDownLatch l = new CountDownLatch(1); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + l.countDown(); + } + }); + return l.await(timeout, timeoutUnit); + } + @Override public Future readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 00c1c6886..87dc8a594 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -153,7 +153,7 @@ abstract class ConnectionPool { } } - StringBuilder errorMsg = new StringBuilder("Publish/Subscribe connection pool exhausted! All connections are busy. Try to increase Publish/Subscribe connection pool size."); + StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Increase connection pool size."); // if (!freezed.isEmpty()) { // errorMsg.append(" Disconnected hosts: " + freezed); // } diff --git a/src/test/java/org/redisson/RedissonMapCacheTest.java b/src/test/java/org/redisson/RedissonMapCacheTest.java index 725ae166e..865bf7852 100644 --- a/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -6,9 +6,12 @@ import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.hamcrest.MatcherAssert; @@ -200,6 +203,47 @@ public class RedissonMapCacheTest extends BaseTest { Assert.assertEquals(0, cache.size()); } + @Test + public void testIteratorRemoveHighVolume() throws InterruptedException { + RMapCache map = redisson.getMapCache("simpleMap"); + for (int i = 0; i < 10000; i++) { + map.put(i, i*10); + } + + int cnt = 0; + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + iterator.remove(); + cnt++; + } + Assert.assertEquals(10000, cnt); + assertThat(map).isEmpty(); + Assert.assertEquals(0, map.size()); + } + + @Test + public void testIteratorRandomRemoveHighVolume() throws InterruptedException { + RMapCache map = redisson.getMapCache("simpleMap"); + for (int i = 0; i < 10000; i++) { + map.put(i, i*10); + } + + int cnt = 0; + int removed = 0; + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + if (ThreadLocalRandom.current().nextBoolean()) { + iterator.remove(); + removed++; + } + cnt++; + } + Assert.assertEquals(10000, cnt); + assertThat(map.size()).isEqualTo(cnt - removed); + } + @Test public void testClearExpire() throws InterruptedException { RMapCache cache = redisson.getMapCache("simple");