Merge pull request #176 from setronica-dev/rethrow_exception_on_op_failure

Use LUA scripts for transactional redis operations
pull/179/head
Nikita Koksharov 10 years ago
commit 2d2cdc14b6

@ -36,11 +36,7 @@ import io.netty.util.concurrent.Promise;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -312,6 +308,31 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return dispatch(EVAL, output, args);
}
public <T> Future<T> evalR(V script, ScriptOutputType type, List<K> keys, List<?> values, List<?> rawValues) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(script.toString()).add(keys.size()).addKeys(keys);
for (Object value : values) {
args.addMapValue((V) value);
}
for (Object value : rawValues) {
if (value instanceof String) {
args.add((String) value);
} else if (value instanceof Integer) {
args.add((Integer) value);
} else if (value instanceof Long) {
args.add((Long) value);
} else if (value instanceof Double) {
args.add((Double) value);
} else if (value instanceof byte[]) {
args.add((byte[]) value);
} else {
throw new IllegalArgumentException("Unsupported raw value type: " + value.getClass());
}
}
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVAL, output, args);
}
public <T> Future<T> evalsha(String digest, ScriptOutputType type, List<K> keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(digest).add(keys.size()).addKeys(keys).addMapValues(values);

@ -10,6 +10,7 @@ package com.lambdaworks.redis;
* <li>{@link #INTEGER} 64-bit integer</li>
* <li>{@link #STATUS} status string</li>
* <li>{@link #VALUE} value</li>
* <li>{@link #MAPVALUE} typed value</li>
* <li>{@link #MULTI} of these types</li>.
* </ul>
*

@ -5,7 +5,9 @@ package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
* 64-bit integer output, may be null.
@ -24,6 +26,6 @@ public class IntegerOutput<K, V> extends CommandOutput<K, V, Long> {
@Override
public void set(ByteBuffer bytes) {
output = null;
output = bytes == null ? null : new Long(decodeAscii(bytes));
}
}

@ -16,7 +16,7 @@ public class MapScanOutput<K, V> extends CommandOutput<K, V, MapScanResult<K, V>
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(((Number) codec.decodeValue(bytes)).longValue());
output.setPos(toLong(bytes));
} else {
if (counter % 2 == 0) {
output.addValue(codec.decodeMapValue(bytes));
@ -27,4 +27,9 @@ public class MapScanOutput<K, V> extends CommandOutput<K, V, MapScanResult<K, V>
counter++;
}
private Long toLong(ByteBuffer bytes) {
return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit()));
}
}

@ -14,10 +14,14 @@ public class ValueSetScanOutput<K, V> extends CommandOutput<K, V, ListScanResult
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(Long.valueOf(codec.decodeMapValue(bytes).toString()));
output.setPos(toLong(bytes));
} else {
output.addValue(codec.decodeMapValue(bytes));
}
}
private Long toLong(ByteBuffer bytes) {
return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit()));
}
}

@ -3,8 +3,10 @@
package com.lambdaworks.redis.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;
import java.util.concurrent.BlockingQueue;
@ -46,7 +48,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
try {
if (!input.isReadable()) return;
// System.out.println("in: " + input.toString(CharsetUtil.UTF_8));
// System.out.println("in: " + toHexString(input));
buffer.discardReadBytes();
buffer.writeBytes(input);
@ -62,11 +64,28 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
// System.out.println("out: " + toHexString(buf));
ctx.write(buf, promise);
}
private String toHexString(ByteBuf buf) {
final StringBuilder builder = new StringBuilder(buf.readableBytes() * 2);
buf.forEachByte(new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
char b = (char) value;
if ((b < ' ' && b != '\n' && b != '\r') || b > '~') {
builder.append("\\x").append(StringUtil.byteToHexStringPadded(value));
} else {
builder.append(b);
}
return true;
}
});
return builder.toString();
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (true) {
Command<K, V, ?> cmd = queue.peek();

@ -24,6 +24,11 @@ import org.redisson.core.RAtomicLong;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import org.redisson.core.RScript;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
@ -49,26 +54,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public boolean compareAndSet(final long expect, final long update) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());
Long value = getLongSafe(conn);
if (value != expect) {
conn.unwatch();
return false;
}
conn.multi();
conn.set(getName(), update);
if (conn.exec().size() == 1) {
return true;
}
}
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('set', KEYS[1], ARGV[2]); return true else return false end",
RScript.ReturnType.BOOLEAN,
keys, Collections.EMPTY_LIST, Arrays.asList(expect, update));
}
@Override
@ -88,51 +79,22 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public long getAndAdd(final long delta) {
return connectionManager.write(getName(), new SyncOperation<Object, Long>() {
@Override
public Long execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());
Long value = getLongSafe(conn);
conn.multi();
conn.set(getName(), value + delta);
if (conn.exec().size() == 1) {
return value;
}
}
}
});
}
private Long getLongSafe(RedisConnection<Object, Object> conn) {
Number n = (Number) conn.get(getName());
if (n != null) {
return n.longValue();
}
return 0L;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], v + ARGV[1]); return tonumber(v)",
RScript.ReturnType.INTEGER,
keys, Collections.EMPTY_LIST, Collections.singletonList(delta));
}
@Override
public long getAndSet(final long newValue) {
return connectionManager.write(getName(), new SyncOperation<Object, Long>() {
@Override
public Long execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());
Long value = getLongSafe(conn);
conn.multi();
conn.set(getName(), newValue);
if (conn.exec().size() == 1) {
return value;
}
}
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)",
RScript.ReturnType.INTEGER,
keys, Collections.EMPTY_LIST, Collections.singletonList(newValue));
}
@Override
@ -156,12 +118,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public void set(final long newValue) {
connectionManager.write(getName(), new ResultOperation<String, Object>() {
@Override
protected Future<String> execute(RedisAsyncConnection<Object, Object> async) {
return async.set(getName(), newValue);
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
new RedissonScript(connectionManager).evalR(
"redis.call('set', KEYS[1], ARGV[1])",
RScript.ReturnType.STATUS,
keys, Collections.EMPTY_LIST, Collections.singletonList(newValue));
}
public String toString() {

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -28,6 +29,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.core.RBlockingQueue;
import com.lambdaworks.redis.RedisConnection;
import org.redisson.core.RScript;
/**
* Offers blocking queue facilities through an intermediary
@ -97,49 +99,39 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public int drainTo(Collection<? super V> c) {
List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
conn.multi();
conn.lrange(getName(), 0, -1);
conn.ltrim(getName(), 0, -1);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
if (c == null) {
throw new NullPointerException();
}
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
List<V> list = new RedissonScript(connectionManager).eval(
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals",
RScript.ReturnType.MAPVALUELIST,
keys);
c.addAll(list);
return list.size();
}
@Override
public int drainTo(Collection<? super V> c, final int maxElements) {
List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
Long len = Math.min(conn.llen(getName()), maxElements);
if (len == 0) {
conn.unwatch();
return Collections.emptyList();
}
conn.multi();
conn.lrange(getName(), 0, len - 1);
conn.ltrim(getName(), len, -1);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
if (maxElements <= 0) {
return 0;
}
if (c == null) {
throw new NullPointerException();
}
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
List<V> list = new RedissonScript(connectionManager).evalR(
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"return vals",
RScript.ReturnType.MAPVALUELIST,
keys, Collections.emptyList(), Collections.singletonList(maxElements));
c.addAll(list);
return list.size();
}

@ -15,23 +15,21 @@
*/
package org.redisson;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RScript;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.async.ResultOperation;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RCountDownLatch;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
/**
* Distributed alternative to the {@link java.util.concurrent.CountDownLatch}
*
@ -195,24 +193,15 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
if (getCount() <= 0) {
return;
}
connectionManager.write(getName(), new SyncOperation<Object, Void>() {
@Override
public Void execute(RedisConnection<Object, Object> conn) {
Long val = conn.decr(getName());
if (val == 0) {
conn.multi();
conn.del(getName());
conn.publish(getChannelName(), zeroCountMessage);
if (conn.exec().size() != 2) {
throw new IllegalStateException();
}
} else if (val < 0) {
conn.del(getName());
}
return null;
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
new RedissonScript(connectionManager).evalR(
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
"return 'OK'",
RScript.ReturnType.STATUS,
keys, Collections.singletonList(zeroCountMessage), Collections.singletonList(getChannelName()));
}
private String getEntryName() {
@ -229,53 +218,42 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
private long getCountInner() {
Number val = connectionManager.read(getName(), new ResultOperation<Number, Number>() {
@Override
protected Future<Number> execute(RedisAsyncConnection<Object, Number> async) {
return async.get(getName());
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
Long val = new RedissonScript(connectionManager).eval(
"return redis.call('get', KEYS[1])",
RScript.ReturnType.INTEGER,
keys);
if (val == null) {
return 0;
}
return val.longValue();
return val;
}
@Override
public boolean trySetCount(final long count) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
conn.watch(getName());
Number oldValue = (Number) conn.get(getName());
if (oldValue != null) {
conn.unwatch();
return false;
}
conn.multi();
conn.set(getName(), count);
conn.publish(getChannelName(), newCountMessage);
return conn.exec().size() == 2;
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end",
RScript.ReturnType.BOOLEAN,
keys, Collections.singletonList(newCountMessage), Arrays.asList(count, getChannelName()));
}
@Override
public boolean delete() {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
conn.multi();
conn.del(getName());
conn.publish(getChannelName(), zeroCountMessage);
if (conn.exec().size() != 2) {
throw new IllegalStateException();
}
return true;
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
Boolean deleted = new RedissonScript(connectionManager).evalR(
"if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end",
RScript.ReturnType.BOOLEAN,
keys, Collections.singletonList(newCountMessage), Collections.singletonList(getChannelName()));
if (!deleted) {
throw new IllegalStateException();
}
return true;
}
}

@ -15,26 +15,19 @@
*/
package org.redisson;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.redisson.async.AsyncOperation;
import org.redisson.async.OperationListener;
import org.redisson.async.ResultOperation;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RList;
import org.redisson.core.RScript;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import java.util.*;
/**
* Distributed and concurrent implementation of {@link java.util.List}
@ -163,7 +156,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return connectionManager.writeAsync(getName(), new AsyncOperation<Object, Boolean>() {
@Override
public void execute(final Promise<Boolean> promise, RedisAsyncConnection<Object, Object> async) {
async.rpush((Object)getName(), c.toArray()).addListener(new OperationListener<Object, Boolean, Object>(promise, async, this) {
async.rpush((Object) getName(), c.toArray()).addListener(new OperationListener<Object, Boolean, Object>(promise, async, this) {
@Override
public void onOperationComplete(Future<Object> future) throws Exception {
promise.setSuccess(true);
@ -180,34 +173,35 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return false;
}
if (index < size()) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());
List<Object> tail = conn.lrange(getName(), index, size());
int first = 0;
int last = 0;
if (index == 0) {
first = size();// truncate the list
last = 0;
} else {
first = 0;
last = index - 1;
}
conn.multi();
conn.ltrim(getName(), first, last);
conn.rpush(getName(), coll.toArray());
conn.rpush(getName(), tail.toArray());
if (conn.exec().size() == 3) {
return true;
}
if (index == 0) { // prepend elements to list
final ArrayList<V> elemens = new ArrayList<V>(coll);
Collections.reverse(elemens);
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
conn.lpush(getName(), elemens.toArray());
return true;
}
}
});
});
}
// insert into middle of list
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return "OK".equals(new RedissonScript(connectionManager).evalR(
"local ind = table.remove(ARGV); " + // index is last parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
"for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return 'OK'",
RScript.ReturnType.STATUS,
keys, new ArrayList<Object>(coll), Collections.singletonList(index)));
} else {
// append to list
return addAll(coll);
}
}
@ -302,21 +296,17 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public V set(final int index, final V element) {
checkIndex(index);
return connectionManager.write(getName(), new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
V prev = (V) conn.lindex(getName(), index);
conn.multi();
conn.lset(getName(), index, element);
if (conn.exec().size() == 1) {
return prev;
}
}
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('lindex', KEYS[1], ARGV[2]); " +
"redis.call('lset', KEYS[1], ARGV[2], ARGV[1]); " +
"return v",
RScript.ReturnType.VALUE,
keys, Collections.singletonList(element), Collections.singletonList(index)
);
}
@Override
@ -339,26 +329,25 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public V remove(final int index) {
checkIndex(index);
return connectionManager.write(getName(), new SyncOperation<Object, V>() {
@Override
public V execute(RedisConnection<Object, Object> conn) {
if (index == 0) {
if (index == 0) {
return connectionManager.write(getName(), new SyncOperation<Object, V>() {
@Override
public V execute(RedisConnection<Object, Object> conn) {
return (V) conn.lpop(getName());
}
while (true) {
conn.watch(getName());
V prev = (V) conn.lindex(getName(), index);
List<Object> tail = conn.lrange(getName(), index + 1, size());
conn.multi();
conn.ltrim(getName(), 0, index - 1);
conn.rpush(getName(), tail.toArray());
if (conn.exec().size() == 2) {
return prev;
}
}
}
});
});
}
// else
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1]);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return v",
RScript.ReturnType.VALUE,
keys, Collections.emptyList(), Collections.singletonList(index));
}
@Override
@ -367,22 +356,15 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
}
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
final int j = i;
List<Object> range = connectionManager.read(getName(), new ResultOperation<List<Object>, Object>() {
@Override
protected Future<List<Object>> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1);
}
});
int index = range.indexOf(o);
if (index != -1) {
return index + i*batchSize;
}
}
return -1;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
Long index = new RedissonScript(connectionManager).eval(
"local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1",
RScript.ReturnType.INTEGER,
keys, o);
return index.intValue();
}
@Override
@ -391,24 +373,15 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
}
final int size = size();
int to = div(size, batchSize);
for (int i = 1; i <= to; i++) {
final int j = i;
final int startIndex = -i*batchSize;
List<Object> range = connectionManager.read(getName(), new ResultOperation<List<Object>, Object>() {
@Override
protected Future<List<Object>> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrange(getName(), startIndex, size - (j-1)*batchSize);
}
});
int index = range.lastIndexOf(o);
if (index != -1) {
return Math.max(size + startIndex, 0) + index;
}
}
return -1;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
Long index = new RedissonScript(connectionManager).eval(
"local s = redis.call('llen', KEYS[1]);" +
"for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1",
RScript.ReturnType.INTEGER,
keys, o);
return index.intValue();
}
@Override

@ -15,97 +15,39 @@
*/
package org.redisson;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RLock;
import org.redisson.core.RScript;
import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.async.ResultOperation;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RLock;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.
* Implements reentrant lock.<br>
* Lock will be removed automatically if client disconnects.
*
* @author Nikita Koksharov
*
*/
public class RedissonLock extends RedissonObject implements RLock {
public static class LockValue implements Serializable {
private static final long serialVersionUID = -8895632286065689476L;
private UUID id;
private Long threadId;
// need for reentrant support
private int counter;
public class RedissonLock extends RedissonExpirable implements RLock {
public LockValue() {
}
public LockValue(UUID id, Long threadId) {
super();
this.id = id;
this.threadId = threadId;
}
public void decCounter() {
counter--;
}
public void incCounter() {
counter++;
}
public int getCounter() {
return counter;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((threadId == null) ? 0 : threadId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LockValue other = (LockValue) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (threadId == null) {
if (other.threadId != null)
return false;
} else if (!threadId.equals(other.threadId))
return false;
return true;
}
}
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
private static final ConcurrentMap<String, Timeout> refreshTaskMap = new ConcurrentHashMap<String, Timeout>();
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
private final UUID id;
@ -214,7 +156,6 @@ public class RedissonLock extends RedissonObject implements RLock {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
@ -224,7 +165,6 @@ public class RedissonLock extends RedissonObject implements RLock {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
@ -280,64 +220,65 @@ public class RedissonLock extends RedissonObject implements RLock {
}
private Long tryLockInner() {
final LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();
return connectionManager.write(getName(), new SyncOperation<LockValue, Long>() {
Long ttlRemaining = tryLockInner(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS);
if (ttlRemaining == null) {
newRefreshTask();
}
return ttlRemaining;
}
private void newRefreshTask() {
if (refreshTaskMap.containsKey(getName())) {
return;
}
Timeout task = connectionManager.newTimeout(new TimerTask() {
@Override
public Long execute(RedisConnection<Object, LockValue> connection) {
Boolean res = connection.setnx(getName(), currentLock);
if (!res) {
connection.watch(getName());
LockValue lock = (LockValue) connection.get(getName());
if (lock != null && lock.equals(currentLock)) {
lock.incCounter();
connection.multi();
connection.set(getName(), lock);
if (connection.exec().size() == 1) {
return null;
}
}
connection.unwatch();
Long ttl = connection.pttl(getName());
return ttl;
}
return null;
public void run(Timeout timeout) throws Exception {
expire(internalLockLeaseTime, TimeUnit.MILLISECONDS);
refreshTaskMap.remove(getName());
newRefreshTask(); // reschedule itself
}
});
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (refreshTaskMap.putIfAbsent(getName(), task) != null) {
task.cancel();
}
}
private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
final LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();
/**
* Stop refresh timer
* @return true if timer was stopped successfully
*/
private boolean stopRefreshTask() {
boolean returnValue =false;
Timeout task = refreshTaskMap.get(getName());
if (task != null) {
returnValue = task.cancel();
refreshTaskMap.remove(getName());
}
return returnValue;
}
return connectionManager.write(getName(), new SyncOperation<Object, Long>() {
@Override
public Long execute(RedisConnection<Object, Object> connection) {
long time = unit.toMillis(leaseTime);
String res = connection.setexnx(getName(), currentLock, time);
if ("OK".equals(res)) {
return null;
} else {
connection.watch(getName());
LockValue lock = (LockValue) connection.get(getName());
if (lock != null && lock.equals(currentLock)) {
lock.incCounter();
connection.multi();
connection.psetex(getName(), time, lock);
if (connection.exec().size() == 1) {
return null;
}
}
connection.unwatch();
Long ttl = connection.pttl(getName());
return ttl;
}
}
});
private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager)
.evalR("local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " +
" return nil; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " +
" return nil; " +
" end;" +
" return redis.call('pttl', KEYS[1]); " +
"end",
RScript.ReturnType.INTEGER,
keys, Collections.singletonList(id.toString() + "-" + Thread.currentThread().getId()), Collections.singletonList(internalLockLeaseTime));
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
@ -399,45 +340,39 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void unlock() {
connectionManager.write(getName(), new SyncOperation<Object, Void>() {
@Override
public Void execute(RedisConnection<Object, Object> connection) {
LockValue lock = (LockValue) connection.get(getName());
if (lock != null) {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock.equals(currentLock)) {
if (lock.getCounter() > 1) {
lock.decCounter();
connection.set(getName(), lock);
} else {
unlock(connection);
}
} else {
throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
} else {
// could be deleted
}
return null;
}
});
}
private void unlock(RedisConnection<Object, Object> connection) {
int counter = 0;
while (counter < 5) {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), unlockMessage);
List<Object> res = connection.exec();
if (res.size() == 2) {
return;
}
counter++;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
String opStatus = new RedissonScript(connectionManager)
.evalR("local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" return 'OK'; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] - 1; " +
" if (o['c'] > 0) then " +
" redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); " +
" return 'FALSE';"+
" else " +
" redis.call('del', KEYS[1]);" +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" return 'OK';"+
" end" +
" end;" +
" return nil; " +
"end",
RScript.ReturnType.STATUS,
keys, Arrays.asList(id.toString() + "-" + Thread.currentThread().getId(), unlockMessage), Arrays.asList(internalLockLeaseTime, getChannelName()));
if ("OK".equals(opStatus)) {
stopRefreshTask();
} else if ("FALSE".equals(opStatus)) {
//do nothing
} else {
throw new IllegalStateException("Can't unlock lock Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
@Override
@ -448,45 +383,61 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void forceUnlock() {
connectionManager.write(getName(), new SyncOperation<Object, Void>() {
@Override
public Void execute(RedisConnection<Object, Object> connection) {
unlock(connection);
return null;
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
stopRefreshTask();
new RedissonScript(connectionManager)
.evalR("redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return 'OK'",
RScript.ReturnType.STATUS,
keys, Collections.singletonList(unlockMessage), Collections.singletonList(getChannelName()));
}
@Override
public boolean isLocked() {
return getCurrentLock() != null;
}
private LockValue getCurrentLock() {
LockValue lock = connectionManager.read(getName(), new ResultOperation<LockValue, LockValue>() {
return connectionManager.read(new SyncOperation<Boolean, Boolean>() {
@Override
protected Future<LockValue> execute(RedisAsyncConnection<Object, LockValue> async) {
return async.get(getName());
public Boolean execute(RedisConnection<Object, Boolean> conn) {
return conn.exists(getName());
}
});
return lock;
}
@Override
public boolean isHeldByCurrentThread() {
LockValue lock = getCurrentLock();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
return lock != null && lock.equals(currentLock);
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
String opStatus = new RedissonScript(connectionManager)
.eval("local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return nil; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" return 'OK'; " +
" else" +
" return nil; " +
" end;" +
"end",
RScript.ReturnType.STATUS,
keys, id.toString() + "-" + Thread.currentThread().getId());
return "OK".equals(opStatus);
}
@Override
public int getHoldCount() {
LockValue lock = getCurrentLock();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock != null && lock.equals(currentLock)) {
return lock.getCounter();
}
return 0;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
Long opStatus = new RedissonScript(connectionManager)
.eval("local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return 0; " +
"else " +
" local o = cjson.decode(v); " +
" return o['c']; " +
"end",
RScript.ReturnType.INTEGER,
keys, id.toString() + "-" + Thread.currentThread().getId());
return opStatus.intValue();
}
@Override

@ -15,16 +15,14 @@
*/
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.*;
import org.redisson.async.AsyncOperation;
import org.redisson.async.OperationListener;
import org.redisson.async.ResultOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.Predicate;
@ -32,6 +30,7 @@ import org.redisson.core.RMap;
import org.redisson.core.RScript;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.output.MapScanResult;
import io.netty.util.concurrent.Future;
@ -129,6 +128,9 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public void putAll(final Map<? extends K, ? extends V> map) {
if (map.size() == 0) {
return;
}
connectionManager.write(getName(), new ResultOperation<String, Object>() {
@Override
public Future<String> execute(RedisAsyncConnection<Object, Object> async) {

@ -70,6 +70,21 @@ public class RedissonScript implements RScript {
});
}
@Override
public <R> R evalR(String luaScript, ReturnType returnType, List<Object> keys, List<?> values, List<?> rawValues) {
return (R) connectionManager.get(evalAsyncR(luaScript, returnType, keys, values, rawValues));
}
@Override
public <R> Future<R> evalAsyncR(final String luaScript, final ReturnType returnType, final List<Object> keys, final List<?> values, final List<?> rawValues) {
return connectionManager.writeAsync(new ResultOperation<R, Object>() {
@Override
protected Future<R> execute(RedisAsyncConnection<Object, Object> async) {
return async.evalR(luaScript, ScriptOutputType.valueOf(returnType.toString()), keys, values, rawValues);
}
});
}
@Override
public <R> R evalSha(String shaDigest, ReturnType returnType) {
return evalSha(shaDigest, returnType, Collections.emptyList());

@ -18,10 +18,7 @@ package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.*;
import org.redisson.async.AsyncOperation;
import org.redisson.async.OperationListener;
@ -121,7 +118,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
}
// lazy init iterator
hasNext();
// hasNext();
iter.remove();
RedissonSet.this.remove(value);
removeExecuted = true;
@ -207,7 +204,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
if (c.isEmpty()) {
return false;
}
Long res = connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
public Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
@ -219,14 +216,13 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public boolean retainAll(Collection<?> c) {
boolean changed = false;
for (Object object : this) {
List<V> toRemove = new ArrayList<V>();
for (V object : this) {
if (!c.contains(object)) {
remove(object);
changed = true;
toRemove.add(object);
}
}
return changed;
return removeAll(toRemove);
}
@Override

@ -15,11 +15,9 @@
*/
package org.redisson.codec;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
*
@ -30,7 +28,7 @@ public class SerializationCodec implements RedissonCodec {
@Override
public Object decodeKey(ByteBuffer bytes) {
return decode(bytes);
return new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit(), Charset.forName("ASCII"));
}
@Override
@ -50,7 +48,7 @@ public class SerializationCodec implements RedissonCodec {
@Override
public byte[] encodeKey(Object key) {
return encodeValue(key);
return key.toString().getBytes(Charset.forName("ASCII"));
}
@Override
@ -73,7 +71,7 @@ public class SerializationCodec implements RedissonCodec {
@Override
public byte[] encodeMapKey(Object key) {
return encodeKey(key);
return encodeValue(key);
}
@Override
@ -83,7 +81,18 @@ public class SerializationCodec implements RedissonCodec {
@Override
public Object decodeMapKey(ByteBuffer bytes) {
return decodeKey(bytes);
return decodeValue(bytes);
}
protected String decodeAscii(ByteBuffer bytes) {
if (bytes == null) {
return null;
}
char[] chars = new char[bytes.remaining()];
for (int i = 0; i < chars.length; i++) {
chars[i] = (char) bytes.get();
}
return new String(chars);
}
}

@ -16,6 +16,8 @@
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import org.redisson.async.AsyncOperation;
@ -26,6 +28,8 @@ import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
@ -90,4 +94,5 @@ public interface ConnectionManager {
EventLoopGroup getGroup();
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
}

@ -805,4 +805,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return group;
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return timer.newTimeout(task, delay, unit);
}
}

@ -21,7 +21,7 @@ import java.util.List;
public interface RScript {
enum ReturnType {BOOLEAN, INTEGER, MULTI, STATUS, VALUE};
enum ReturnType {BOOLEAN, INTEGER, MULTI, STATUS, VALUE, MAPVALUE, MAPVALUELIST};
List<Boolean> scriptExists(String ... shaDigests);
@ -38,7 +38,11 @@ public interface RScript {
String scriptLoad(String luaScript);
Future<String> scriptLoadAsync(String luaScript);
<R> R evalR(String luaScript, ReturnType returnType, List<Object> keys, List<?> values, List<?> rawValues);
<R> Future<R> evalAsyncR(String luaScript, ReturnType returnType, List<Object> keys, List<?> values, List<?> rawValues);
<R> R evalSha(String shaDigest, ReturnType returnType);
<R> R evalSha(String shaDigest, ReturnType returnType, List<Object> keys, Object... values);

@ -11,11 +11,11 @@ import java.util.concurrent.TimeUnit;
public abstract class BaseConcurrentTest {
protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
ExecutorService executor = Executors.newCachedThreadPool();
final Map<Integer, Redisson> instances = new HashMap<Integer, Redisson>();
for (int i = 0; i < iterations; i++) {
instances.put(i, Redisson.create());
instances.put(i, BaseTest.createInstance());
}
long watch = System.currentTimeMillis();
@ -35,7 +35,7 @@ public abstract class BaseConcurrentTest {
System.out.println("multi: " + (System.currentTimeMillis() - watch));
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
executor = Executors.newCachedThreadPool();
for (final Redisson redisson : instances.values()) {
executor.execute(new Runnable() {
@ -53,7 +53,7 @@ public abstract class BaseConcurrentTest {
protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final Redisson redisson = Redisson.create();
final Redisson redisson = BaseTest.createInstance();
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {

@ -6,6 +6,7 @@ import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.redisson.codec.SerializationCodec;
public abstract class BaseTest {
@ -13,7 +14,18 @@ public abstract class BaseTest {
@Before
public void before() {
redisson = Redisson.create();
this.redisson = createInstance();
}
public static Redisson createInstance() {
String redisAddress = System.getProperty("redisAddress");
if (redisAddress == null) {
redisAddress = "127.0.0.1:6379";
}
Config config = new Config();
config.useSingleServer().setAddress(redisAddress);
// config.setCodec(new SerializationCodec());
return Redisson.create(config);
}
@After

@ -17,7 +17,7 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
public void testAdd_SingleInstance() throws InterruptedException {
final String name = "testAdd_SingleInstance";
Redisson r = Redisson.create();
Redisson r = BaseTest.createInstance();
RSortedSet<Integer> map = r.getSortedSet(name);
map.clear();
@ -54,7 +54,7 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
public void testAddNegative_SingleInstance() throws InterruptedException {
final String name = "testAddNegative_SingleInstance";
Redisson r = Redisson.create();
Redisson r = BaseTest.createInstance();
RSortedSet<Integer> map = r.getSortedSet(name);
map.clear();

@ -152,4 +152,36 @@ public class RedissonBlockingQueueTest extends BaseTest {
assertThat(counter.get(), equalTo(total));
queue.delete();
}
@Test
public void testDrainToCollection() throws Exception {
RBlockingQueue<Object> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
queue1.put(2L);
queue1.put("e");
ArrayList<Object> dst = new ArrayList<Object>();
queue1.drainTo(dst);
MatcherAssert.assertThat(dst, Matchers.<Object>contains(1, 2L, "e"));
Assert.assertEquals(0, queue1.size());
}
@Test
public void testDrainToCollectionLimited() throws Exception {
RBlockingQueue<Object> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
queue1.put(2L);
queue1.put("e");
ArrayList<Object> dst = new ArrayList<Object>();
queue1.drainTo(dst, 2);
MatcherAssert.assertThat(dst, Matchers.<Object>contains(1, 2L));
Assert.assertEquals(1, queue1.size());
dst.clear();
queue1.drainTo(dst, 2);
MatcherAssert.assertThat(dst, Matchers.<Object>contains("e"));
}
}

@ -13,7 +13,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void testSingleReplaceOldValue_SingleInstance() throws InterruptedException {
final String name = "testSingleReplaceOldValue_SingleInstance";
ConcurrentMap<String, String> map = Redisson.create().getMap(name);
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "122");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@ -25,7 +25,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
});
ConcurrentMap<String, String> testMap = Redisson.create().getMap(name);
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("32", testMap.get("1"));
assertMapSize(1, name);
@ -35,7 +35,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void testSingleRemoveValue_SingleInstance() throws InterruptedException {
final String name = "testSingleRemoveValue_SingleInstance";
ConcurrentMap<String, String> map = Redisson.create().getMap(name);
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
@ -52,7 +52,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void testSingleReplace_SingleInstance() throws InterruptedException {
final String name = "testSingleReplace_SingleInstance";
ConcurrentMap<String, String> map = Redisson.create().getMap(name);
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.put("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@ -63,7 +63,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
});
ConcurrentMap<String, String> testMap = Redisson.create().getMap(name);
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("3", testMap.get("1"));
assertMapSize(1, name);
@ -73,7 +73,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void test_Multi_Replace_MultiInstance() throws InterruptedException {
final String name = "test_Multi_Replace_MultiInstance";
Redisson redisson = Redisson.create();
Redisson redisson = BaseTest.createInstance();
ConcurrentMap<Integer, Integer> map = redisson.getMap(name);
for (int i = 0; i < 5; i++) {
map.put(i, 1);
@ -88,7 +88,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
});
ConcurrentMap<Integer, Integer> testMap = Redisson.create().getMap(name);
ConcurrentMap<Integer, Integer> testMap = BaseTest.createInstance().getMap(name);
for (Integer value : testMap.values()) {
Assert.assertEquals(2, (int)value);
}
@ -102,7 +102,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void test_Multi_RemoveValue_MultiInstance() throws InterruptedException {
final String name = "test_Multi_RemoveValue_MultiInstance";
ConcurrentMap<Integer, Integer> map = Redisson.create().getMap(name);
ConcurrentMap<Integer, Integer> map = BaseTest.createInstance().getMap(name);
for (int i = 0; i < 10; i++) {
map.put(i, 1);
}
@ -123,7 +123,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
public void testSinglePutIfAbsent_SingleInstance() throws InterruptedException {
final String name = "testSinglePutIfAbsent_SingleInstance";
ConcurrentMap<String, String> map = Redisson.create().getMap(name);
ConcurrentMap<String, String> map = BaseTest.createInstance().getMap(name);
map.putIfAbsent("1", "0");
testSingleInstanceConcurrency(100, new RedissonRunnable() {
@Override
@ -133,7 +133,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
});
ConcurrentMap<String, String> testMap = Redisson.create().getMap(name);
ConcurrentMap<String, String> testMap = BaseTest.createInstance().getMap(name);
Assert.assertEquals("0", testMap.get("1"));
assertMapSize(1, name);
@ -168,7 +168,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest {
}
private void assertMapSize(int size, String name) {
Map<String, String> map = Redisson.create().getMap(name);
Map<String, String> map = BaseTest.createInstance().getMap(name);
Assert.assertEquals(size, map.size());
clear(map);
}

@ -15,7 +15,7 @@ public class RedissonCountDownLatchConcurrentTest {
public void testSingleCountDownAwait_SingleInstance() throws InterruptedException {
final int iterations = Runtime.getRuntime().availableProcessors()*3;
Redisson redisson = Redisson.create();
Redisson redisson = BaseTest.createInstance();
final RCountDownLatch latch = redisson.getCountDownLatch("latch");
latch.trySetCount(iterations);

@ -8,12 +8,10 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RCountDownLatch;
public class RedissonCountDownLatchTest {
public class RedissonCountDownLatchTest extends BaseTest {
@Test
public void testAwaitTimeout() throws InterruptedException {
Redisson redisson = Redisson.create();
ExecutorService executor = Executors.newFixedThreadPool(2);
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
@ -48,13 +46,10 @@ public class RedissonCountDownLatchTest {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
redisson.shutdown();
}
@Test
public void testAwaitTimeoutFail() throws InterruptedException {
Redisson redisson = Redisson.create();
ExecutorService executor = Executors.newFixedThreadPool(2);
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
@ -88,15 +83,14 @@ public class RedissonCountDownLatchTest {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
redisson.shutdown();
}
@Test
public void testCountDown() throws InterruptedException {
Redisson redisson = Redisson.create();
RCountDownLatch latch = redisson.getCountDownLatch("latch");
latch.trySetCount(1);
latch.trySetCount(2);
Assert.assertEquals(2, latch.getCount());
latch.countDown();
Assert.assertEquals(1, latch.getCount());
latch.countDown();
Assert.assertEquals(0, latch.getCount());
@ -131,8 +125,25 @@ public class RedissonCountDownLatchTest {
latch4.countDown();
Assert.assertEquals(0, latch.getCount());
latch4.await();
}
@Test
public void testDelete() throws Exception {
RCountDownLatch latch = redisson.getCountDownLatch("latch");
latch.trySetCount(1);
Assert.assertTrue(latch.delete());
}
redisson.shutdown();
@Test(expected = IllegalStateException.class)
public void testDeleteFailed() throws Exception {
RCountDownLatch latch = redisson.getCountDownLatch("latch");
Assert.assertTrue(latch.delete());
}
@Test
public void testTrySetCount() throws Exception {
RCountDownLatch latch = redisson.getCountDownLatch("latch");
Assert.assertTrue(latch.trySetCount(1));
Assert.assertFalse(latch.trySetCount(2));
}
}

@ -3,12 +3,7 @@ package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.*;
import org.hamcrest.Matchers;
import org.junit.Assert;
@ -38,7 +33,7 @@ public class RedissonListTest extends BaseTest {
public void operationComplete(Future<Boolean> future) throws Exception {
list.addAsync(2L);
}
});
}).awaitUninterruptibly();
Assert.assertThat(list, Matchers.contains(1L, 2L));
}
@ -465,6 +460,44 @@ public class RedissonListTest extends BaseTest {
Assert.assertTrue(list.isEmpty());
}
@Test
public void testRetainAll() {
List<Integer> list = redisson.getList("list");
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
Assert.assertTrue(list.retainAll(Arrays.asList(3, 2, 10, 6)));
Assert.assertThat(list, Matchers.contains(2, 3));
Assert.assertEquals(2, list.size());
}
@Test
public void testRetainAllEmpty() {
List<Integer> list = redisson.getList("list");
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
Assert.assertTrue(list.retainAll(Collections.<Integer>emptyList()));
Assert.assertEquals(0, list.size());
}
@Test
public void testRetainAllNoModify() {
List<Integer> list = redisson.getList("list");
list.add(1);
list.add(2);
Assert.assertFalse(list.retainAll(Arrays.asList(1, 2))); // nothing changed
Assert.assertThat(list, Matchers.contains(1, 2));
}
@Test
public void testAddAllIndex() {
@ -487,7 +520,7 @@ public class RedissonListTest extends BaseTest {
Assert.assertThat(list, Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
list.addAll(0, Arrays.asList(6,7));
list.addAll(0, Arrays.asList(6, 7));
Assert.assertThat(list, Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
}
@ -501,7 +534,7 @@ public class RedissonListTest extends BaseTest {
list.add(4);
list.add(5);
list.addAll(2, Arrays.asList(7,8,9));
list.addAll(2, Arrays.asList(7, 8, 9));
list.addAll(list.size()-1, Arrays.asList(9, 1, 9));
@ -526,13 +559,20 @@ public class RedissonListTest extends BaseTest {
list.add(4);
list.add(5);
list.addAll(Arrays.asList(7,8,9));
Assert.assertTrue(list.addAll(Arrays.asList(7, 8, 9)));
list.addAll(Arrays.asList(9, 1, 9));
Assert.assertTrue(list.addAll(Arrays.asList(9, 1, 9)));
Assert.assertThat(list, Matchers.contains(1, 2, 3, 4, 5, 7, 8, 9, 9, 1, 9));
}
@Test
public void testAddAllEmpty() throws Exception {
List<Integer> list = redisson.getList("list");
Assert.assertFalse(list.addAll(Collections.<Integer>emptyList()));
Assert.assertEquals(0, list.size());
}
@Test
public void testContainsAll() {
List<Integer> list = redisson.getList("list");
@ -553,10 +593,10 @@ public class RedissonListTest extends BaseTest {
list.add("5");
list.add("3");
Assert.assertArrayEquals(list.toArray(), new Object[] {"1", "4", "2", "5", "3"});
Assert.assertArrayEquals(list.toArray(), new Object[]{"1", "4", "2", "5", "3"});
String[] strs = list.toArray(new String[0]);
Assert.assertArrayEquals(strs, new String[] {"1", "4", "2", "5", "3"});
Assert.assertArrayEquals(strs, new String[]{"1", "4", "2", "5", "3"});
}
@ -694,4 +734,14 @@ public class RedissonListTest extends BaseTest {
Assert.assertThat(list, Matchers.contains("1", "3", "5", "6"));
}
@Test
public void testCodec() {
List<Object> list = redisson.getList("list");
list.add(1);
list.add(2L);
list.add("3");
list.add("e");
Assert.assertThat(list, Matchers.<Object>contains(1, 2L, "3", "e"));
}
}

@ -17,7 +17,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Before
public void before() {
redisson = Redisson.create();
redisson = BaseTest.createInstance();
}
@After
@ -51,6 +51,23 @@ public class RedissonLockTest extends BaseConcurrentTest {
lock.unlock();
}
@Test
public void testAutoExpire() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
testSingleInstanceConcurrency(1, new RedissonRunnable() {
@Override
public void run(Redisson redisson) {
RLock lock = redisson.getLock("lock");
lock.lock();
latch.countDown();
}
});
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getLock("lock");
Thread.sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS + 1));
Assert.assertFalse("Transient lock expired automatically", lock.isLocked());
}
@Test
public void testGetHoldCount() {
@ -166,11 +183,22 @@ public class RedissonLockTest extends BaseConcurrentTest {
}
@Test
public void testReentrancy() {
public void testReentrancy() throws InterruptedException {
Lock lock = redisson.getLock("lock1");
lock.lock();
lock.lock();
Assert.assertTrue(lock.tryLock());
Assert.assertTrue(lock.tryLock());
lock.unlock();
// next row for test renew expiration tisk.
//Thread.currentThread().sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS*2));
Thread thread1 = new Thread() {
@Override
public void run() {
RLock lock1 = (RedissonLock) redisson.getLock("lock1");
Assert.assertFalse(lock1.tryLock());
}
};
thread1.start();
thread1.join();
lock.unlock();
}
@ -187,7 +215,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
System.out.println("lock1 " + Thread.currentThread().getId());
lock.lock();
System.out.println("lock2 "+ Thread.currentThread().getId());
lockedCounter.set(lockedCounter.get() + 1);
lockedCounter.incrementAndGet();
System.out.println("lockedCounter " + lockedCounter);
System.out.println("unlock1 "+ Thread.currentThread().getId());
lock.unlock();
@ -213,7 +241,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.set(lockedCounter.get() + 1);
lockedCounter.incrementAndGet();
redisson.getLock("testConcurrency_MultiInstance1").unlock();
}
}
@ -232,7 +260,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
public void run(Redisson redisson) {
Lock lock = redisson.getLock("testConcurrency_MultiInstance2");
lock.lock();
lockedCounter.set(lockedCounter.get() + 1);
lockedCounter.incrementAndGet();
lock.unlock();
}
});

@ -388,6 +388,22 @@ public class RedissonMapTest extends BaseTest {
Assert.assertEquals("6", val2.getValue());
}
@Test
public void testPutIfAbsent() throws Exception {
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
map.put(key, value);
Assert.assertEquals(value, map.putIfAbsent(key, new SimpleValue("3")));
Assert.assertEquals(value, map.get(key));
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
Assert.assertNull(map.putIfAbsent(key1, value1));
Assert.assertEquals(value1, map.get(key1));
}
@Test
public void testSize() {
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");

@ -2,13 +2,8 @@ package org.redisson;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
@ -20,7 +15,7 @@ import org.redisson.core.RSortedSet;
public class RedissonSetTest extends BaseTest {
public static class SimpleBean {
public static class SimpleBean implements Serializable {
private Long lng;
@ -158,14 +153,37 @@ public class RedissonSetTest extends BaseTest {
@Test
public void testRetainAll() {
Set<Integer> set = redisson.getSet("set");
for (int i = 0; i < 200; i++) {
for (int i = 0; i < 20000; i++) {
set.add(i);
}
Assert.assertTrue(set.retainAll(Arrays.asList(1, 2)));
Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2));
Assert.assertEquals(2, set.size());
}
// @Test
// public void testIteratorRemoveHighVolume() {
// Set<Integer> set = redisson.getSet("set") /*new HashSet<Integer>()*/;
// for (int i = 0; i < 120000; i++) {
// set.add(i);
// }
// int cnt = 0;
// Iterator<Integer> iterator = set.iterator();
// while (iterator.hasNext()) {
// Integer integer = iterator.next();
// if (integer > -1) { // always
// iterator.remove();
// }
// cnt++;
// }
// System.out.println("-----------");
// for (Integer integer : set) {
// System.out.println(integer);
// }
// Assert.assertEquals(20000, cnt);
// }
@Test
public void testContainsAll() {
Set<Integer> set = redisson.getSet("set");
@ -234,4 +252,27 @@ public class RedissonSetTest extends BaseTest {
Assert.assertEquals(5, set.size());
}
@Test
public void testRetainAllEmpty() {
Set<Integer> set = redisson.getSet("set");
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
Assert.assertTrue(set.retainAll(Collections.<Integer>emptyList()));
Assert.assertEquals(0, set.size());
}
@Test
public void testRetainAllNoModify() {
Set<Integer> set = redisson.getSet("set");
set.add(1);
set.add(2);
Assert.assertFalse(set.retainAll(Arrays.asList(1, 2))); // nothing changed
Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2));
}
}

@ -1,5 +1,6 @@
package org.redisson;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
@ -9,7 +10,7 @@ import org.redisson.core.RTopic;
public class RedissonTopicTest {
public static class Message {
public static class Message implements Serializable {
private String name;
@ -47,7 +48,7 @@ public class RedissonTopicTest {
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
Redisson redisson = Redisson.create();
Redisson redisson = BaseTest.createInstance();
RTopic<Message> topic1 = redisson.getTopic("topic1");
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
@ -77,7 +78,7 @@ public class RedissonTopicTest {
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
Redisson redisson1 = Redisson.create();
Redisson redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
int listenerId = topic1.addListener(new MessageListener<Message>() {
@Override
@ -89,7 +90,7 @@ public class RedissonTopicTest {
topic1.removeListener(listenerId);
Thread.sleep(1000);
Redisson redisson2 = Redisson.create();
Redisson redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
@ -111,7 +112,7 @@ public class RedissonTopicTest {
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(2);
Redisson redisson1 = Redisson.create();
Redisson redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
topic1.addListener(new MessageListener<Message>() {
@Override
@ -121,7 +122,7 @@ public class RedissonTopicTest {
}
});
Redisson redisson2 = Redisson.create();
Redisson redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic2.addListener(new MessageListener<Message>() {
@Override
@ -140,7 +141,7 @@ public class RedissonTopicTest {
@Test
public void testListenerRemove() throws InterruptedException {
Redisson redisson1 = Redisson.create();
Redisson redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");
int id = topic1.addListener(new MessageListener<Message>() {
@Override
@ -149,7 +150,7 @@ public class RedissonTopicTest {
}
});
Redisson redisson2 = Redisson.create();
Redisson redisson2 = BaseTest.createInstance();
RTopic<Message> topic2 = redisson2.getTopic("topic");
topic1.removeListener(id);
topic2.publish(new Message("123"));

@ -1,6 +1,8 @@
package org.redisson;
public class TestObject implements Comparable<TestObject> {
import java.io.Serializable;
public class TestObject implements Comparable<TestObject>, Serializable {
private String name;
private String value;

Loading…
Cancel
Save