Merge remote-tracking branch 'mrniko/master' into feature/travis-ci

# Conflicts:
#	src/main/java/org/redisson/RedissonList.java
#	src/main/java/org/redisson/RedissonSet.java
pull/509/head
jackygurui 9 years ago
commit 785a627a12

@ -64,7 +64,7 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
firstValues = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstValues)) {
if (firstValues.entrySet().containsAll(newValues.entrySet())) {
finished = true;
free(firstValues);
free(newValues);

@ -53,7 +53,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public void await() throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
promise.await();
get(promise);
while (getCount() > 0) {
// waiting for open state
@ -71,7 +71,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
if (!promise.await(time, unit)) {
if (!await(promise, time, unit)) {
return false;
}

@ -45,12 +45,19 @@ import io.netty.util.concurrent.Future;
public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
MultiDecoder<Map<Object, Object>> postitionDecoder;
MultiDecoder<Map<Object, Object>> distanceDecoder;
public RedissonGeo(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}
public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}
@Override
@ -119,7 +126,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
params.add(getName());
params.addAll(Arrays.asList(members));
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params));
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params), true);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder, 2, ValueType.OBJECTS);
return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray());
}
@ -141,8 +148,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
@Override
public Future<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", decoder);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHDIST");
}
@ -153,8 +159,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
@Override
public Future<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", decoder);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD");
}
@ -175,8 +180,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
@Override
public Future<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", decoder, 2);
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", distanceDecoder, 2);
return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHDIST");
}
@ -187,8 +191,7 @@ public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
@Override
public Future<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER", decoder, 2);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER", postitionDecoder, 2);
return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHCOORD");
}
}

@ -143,13 +143,13 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
"for j = 0, table.getn(ARGV), 1 do " +
"for j = 1, #ARGV, 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0 and 1 or 0",
"return #ARGV == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -222,7 +222,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 1, table.getn(ARGV), 1 do "
"for i = 1, #ARGV, 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "
@ -246,11 +246,10 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
"local changed = 0 " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
+ "while i <= s do "
+ "while i <= #items do "
+ "local element = items[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
@ -390,7 +389,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +
"for i = #items, 0, -1 do " +
"for i = #items, 1, -1 do " +
"if items[i] == obj then " +
"return i - 1 " +
"end " +

@ -199,13 +199,13 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
+ "return 0;"
+ "end; " +
"local items = redis.call('lrange', KEYS[2], 0, -1);" +
"for i = 0, #items, 1 do " +
"for j = 2, table.getn(ARGV), 1 do "
"for i = 1, #items, 1 do " +
"for j = 2, #ARGV, 1 do "
+ "if ARGV[j] == items[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 2 and 1 or 0; ",
+ "return #ARGV == 2 and 1 or 0; ",
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@ -340,7 +340,7 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
"local changed = 0; " +
"local s = redis.call('lrange', KEYS[2], 0, -1); "
+ "local i = 0; "
+ "local i = 1; "
+ "while i <= #s do "
+ "local element = s[i]; "
+ "local isInAgrs = false; "
@ -508,7 +508,7 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
+ "end; " +
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i = #items, 0, -1 do " +
"for i = #items, 1, -1 do " +
"if items[i] == ARGV[1] then " +
"return i - 1 " +
"end " +

@ -110,7 +110,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
Future<RedissonLockEntry> future = subscribe();
future.sync();
get(future);
try {
while (true) {
@ -229,7 +229,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
Future<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
if (!await(future, time, TimeUnit.MILLISECONDS)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {

@ -107,7 +107,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Future<Boolean> containsValueAsync(Object value) {
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
"for i = 1, #s, 1 do "
+ "if ARGV[1] == s[i] then "
+ "return 1 "
+ "end "

@ -15,6 +15,8 @@
*/
package org.redisson;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -45,6 +47,10 @@ abstract class RedissonObject implements RObject {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
return commandExecutor.await(future, timeout, timeoutUnit);
}
protected <V> V get(Future<V> future) {
return commandExecutor.get(future);
}

@ -280,13 +280,13 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('zrange', KEYS[1], 0, -1);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
"for i = 1, #s, 1 do " +
"for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
+ "return #ARGV == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -316,11 +316,11 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local i = 1 "
+ "while i <= #s do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "

@ -70,7 +70,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return;
}
Future<RedissonLockEntry> future = subscribe().sync();
Future<RedissonLockEntry> future = subscribe();
get(future);
try {
while (true) {
if (tryAcquire(permits)) {
@ -113,7 +114,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
long time = unit.toMillis(waitTime);
Future<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return false;
}

@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.client.codec.Codec;
@ -168,13 +167,13 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
"for i = 1, #s, 1 do " +
"for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
+ "return #ARGV == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -205,11 +204,11 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local i = 1 "
+ "while i <= #s do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
@ -229,7 +228,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 1, table.getn(ARGV), 1 do "
"for i = 1, #ARGV, 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "

@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -336,14 +335,14 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
"for i = 1, #s, 1 do " +
"for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == s[i] then "
+ "table.remove(ARGV, j) "
+ "end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
+ "return #ARGV == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}

@ -251,13 +251,13 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
+ "return 0;"
+ "end; " +
"local s = redis.call('smembers', KEYS[2]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 2, table.getn(ARGV), 1 do "
"for i = 1, #s, 1 do " +
"for j = 2, #ARGV, 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 2 and 1 or 0; ",
+ "return #ARGV == 2 and 1 or 0; ",
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@ -307,11 +307,11 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[2]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local i = 1 "
+ "while i <= #s do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 2, table.getn(ARGV), 1 do "
+ "for j = 2, #ARGV, 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
@ -350,7 +350,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
+ "end; " +
"local v = 0 " +
"for i = 2, table.getn(ARGV), 1 do "
"for i = 2, #ARGV, 1 do "
+ "if redis.call('srem', KEYS[2], ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "

@ -104,13 +104,13 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
"local toIndex = table.remove(ARGV, 2);" +
"local items = redis.call('lrange', KEYS[1], tonumber(fromIndex), tonumber(toIndex)) " +
"for i=1, #items do " +
"for j = 0, #ARGV, 1 do " +
"for j = 1, #ARGV, 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0 and 1 or 0",
"return #ARGV == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), params.toArray());
}
@ -203,11 +203,10 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
"local toIndex = table.remove(ARGV, 2);" +
"local items = redis.call('lrange', KEYS[1], fromIndex, toIndex) "
+ "local i = 1 "
+ "local s = table.getn(items) "
+ "while i <= s do "
+ "while i <= #items do "
+ "local element = items[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "

@ -15,6 +15,7 @@
*/
package org.redisson.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
@ -111,21 +112,34 @@ public class RedisConnection implements RedisCommands {
return redisClient;
}
public <R> R await(Future<R> cmd) {
// TODO change connectTimeout to timeout
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex);
throw ex;
}
if (!cmd.isSuccess()) {
if (cmd.cause() instanceof RedisException) {
throw (RedisException) cmd.cause();
public <R> R await(Future<R> future) {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
l.countDown();
}
});
try {
// TODO change connectTimeout to timeout
if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)future;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex);
throw ex;
}
if (!future.isSuccess()) {
if (future.cause() instanceof RedisException) {
throw (RedisException) future.cause();
}
throw new RedisException("Unexpected exception while processing command", future.cause());
}
throw new RedisException("Unexpected exception while processing command", cmd.cause());
return future.getNow();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
return cmd.getNow();
}
public <T> T sync(RedisStrictCommand<T> command, Object ... params) {

@ -173,7 +173,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (code == '+') {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
handleResult(data, parts, result, false, channel);
} else if (code == '-') {
String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
@ -206,9 +206,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
} else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Object result = Long.valueOf(status);
Long result = readLong(in);
handleResult(data, parts, result, false, channel);
} else if (code == '$') {
ByteBuf buf = readBytes(in);

@ -23,10 +23,10 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
@ -37,7 +37,7 @@ import io.netty.util.internal.PlatformDependent;
* @author Nikita Koksharov
*
*/
public class CommandsQueue extends ChannelDuplexHandler {
public class CommandsQueue extends ChannelOutboundHandlerAdapter {
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");

@ -27,6 +27,10 @@ public class FlatNestedMultiDecoder<T> extends NestedMultiDecoder {
super(firstDecoder, secondDecoder);
}
public FlatNestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, boolean handleEmpty) {
super(firstDecoder, secondDecoder, handleEmpty);
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return firstDecoder.decode(buf, state);

@ -58,19 +58,27 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
protected final MultiDecoder<Object> firstDecoder;
protected final MultiDecoder<Object> secondDecoder;
private MultiDecoder<Object> thirdDecoder;
private boolean handleEmpty;
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
this(firstDecoder, secondDecoder, false);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, boolean handleEmpty) {
this(firstDecoder, secondDecoder, null, handleEmpty);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, MultiDecoder<Object> thirdDecoder) {
this(firstDecoder, secondDecoder, thirdDecoder, false);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, MultiDecoder<Object> thirdDecoder, boolean handleEmpty) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
this.thirdDecoder = thirdDecoder;
this.handleEmpty = handleEmpty;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
DecoderState ds = getDecoder(state);
@ -121,7 +129,7 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty() && state.getDecoderState() == null) {
if (parts.isEmpty() && state.getDecoderState() == null && handleEmpty) {
MultiDecoder<?> decoder = secondDecoder;
if (thirdDecoder != null) {
decoder = thirdDecoder;

@ -18,6 +18,7 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.SlotCallback;
import org.redisson.client.RedisException;
@ -38,6 +39,8 @@ public interface CommandAsyncExecutor {
<V> RedisException convertException(Future<V> future);
boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
<V> V get(Future<V> future);
<T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);

@ -22,9 +22,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedisClientResult;
import org.redisson.RedissonShutdownException;
@ -82,13 +84,38 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
l.countDown();
}
});
try {
l.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// commented out due to blocking issues up to 200 ms per minute for each thread
// future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
@Override
public boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
l.countDown();
}
});
return l.await(timeout, timeoutUnit);
}
@Override
public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();

@ -153,7 +153,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
StringBuilder errorMsg = new StringBuilder("Publish/Subscribe connection pool exhausted! All connections are busy. Try to increase Publish/Subscribe connection pool size.");
StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Increase connection pool size.");
// if (!freezed.isEmpty()) {
// errorMsg.append(" Disconnected hosts: " + freezed);
// }

@ -6,9 +6,12 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
@ -200,6 +203,47 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(0, cache.size());
}
@Test
public void testIteratorRemoveHighVolume() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simpleMap");
for (int i = 0; i < 10000; i++) {
map.put(i, i*10);
}
int cnt = 0;
Iterator<Entry<Integer, Integer>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, Integer> entry = iterator.next();
iterator.remove();
cnt++;
}
Assert.assertEquals(10000, cnt);
assertThat(map).isEmpty();
Assert.assertEquals(0, map.size());
}
@Test
public void testIteratorRandomRemoveHighVolume() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simpleMap");
for (int i = 0; i < 10000; i++) {
map.put(i, i*10);
}
int cnt = 0;
int removed = 0;
Iterator<Entry<Integer, Integer>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, Integer> entry = iterator.next();
if (ThreadLocalRandom.current().nextBoolean()) {
iterator.remove();
removed++;
}
cnt++;
}
Assert.assertEquals(10000, cnt);
assertThat(map.size()).isEqualTo(cnt - removed);
}
@Test
public void testClearExpire() throws InterruptedException {
RMapCache<String, String> cache = redisson.getMapCache("simple");

Loading…
Cancel
Save