From 98c21fbd18da21b28848168e750d4af14fc30f65 Mon Sep 17 00:00:00 2001 From: Andrew Kolpakov Date: Fri, 28 Nov 2014 13:23:37 +0600 Subject: [PATCH 1/2] use LUA scripts for transactional operations --- .../redis/RedisAsyncConnection.java | 31 +- .../lambdaworks/redis/ScriptOutputType.java | 1 + .../redis/output/IntegerOutput.java | 4 +- .../redis/output/MapScanOutput.java | 7 +- .../redis/output/ValueSetScanOutput.java | 6 +- .../redis/protocol/CommandHandler.java | 20 ++ .../java/org/redisson/RedissonAtomicLong.java | 96 ++--- .../org/redisson/RedissonBlockingQueue.java | 66 ++-- .../org/redisson/RedissonCountDownLatch.java | 102 +++--- src/main/java/org/redisson/RedissonList.java | 183 +++++----- src/main/java/org/redisson/RedissonLock.java | 331 ++++++++---------- src/main/java/org/redisson/RedissonMap.java | 18 +- .../java/org/redisson/RedissonScript.java | 15 + src/main/java/org/redisson/RedissonSet.java | 18 +- .../redisson/codec/SerializationCodec.java | 25 +- .../connection/ConnectionManager.java | 5 + .../MasterSlaveConnectionManager.java | 5 + src/main/java/org/redisson/core/RScript.java | 8 +- .../java/org/redisson/BaseConcurrentTest.java | 8 +- src/test/java/org/redisson/BaseTest.java | 14 +- .../ConcurrentRedissonSortedSetTest.java | 4 +- .../redisson/RedissonBlockingQueueTest.java | 32 ++ .../redisson/RedissonConcurrentMapTest.java | 22 +- .../RedissonCountDownLatchConcurrentTest.java | 2 +- .../redisson/RedissonCountDownLatchTest.java | 33 +- .../java/org/redisson/RedissonListTest.java | 76 +++- .../java/org/redisson/RedissonLockTest.java | 42 ++- .../java/org/redisson/RedissonMapTest.java | 16 + .../java/org/redisson/RedissonSetTest.java | 59 +++- .../java/org/redisson/RedissonTopicTest.java | 17 +- src/test/java/org/redisson/TestObject.java | 4 +- 31 files changed, 704 insertions(+), 566 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java index ed95587c0..656edf3b5 100644 --- a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java @@ -36,11 +36,7 @@ import io.netty.util.concurrent.Promise; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -312,6 +308,31 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { return dispatch(EVAL, output, args); } + public Future evalR(V script, ScriptOutputType type, List keys, List values, List rawValues) { + CommandArgs args = new CommandArgs(codec); + args.add(script.toString()).add(keys.size()).addKeys(keys); + for (Object value : values) { + args.addMapValue((V) value); + } + for (Object value : rawValues) { + if (value instanceof String) { + args.add((String) value); + } else if (value instanceof Integer) { + args.add((Integer) value); + } else if (value instanceof Long) { + args.add((Long) value); + } else if (value instanceof Double) { + args.add((Double) value); + } else if (value instanceof byte[]) { + args.add((byte[]) value); + } else { + throw new IllegalArgumentException("Unsupported raw value type: " + value.getClass()); + } + } + CommandOutput output = newScriptOutput(codec, type); + return dispatch(EVAL, output, args); + } + public Future evalsha(String digest, ScriptOutputType type, List keys, V... values) { CommandArgs args = new CommandArgs(codec); args.add(digest).add(keys.size()).addKeys(keys).addMapValues(values); diff --git a/src/main/java/com/lambdaworks/redis/ScriptOutputType.java b/src/main/java/com/lambdaworks/redis/ScriptOutputType.java index c2c5c3a9d..eebca27c1 100644 --- a/src/main/java/com/lambdaworks/redis/ScriptOutputType.java +++ b/src/main/java/com/lambdaworks/redis/ScriptOutputType.java @@ -10,6 +10,7 @@ package com.lambdaworks.redis; *
  • {@link #INTEGER} 64-bit integer
  • *
  • {@link #STATUS} status string
  • *
  • {@link #VALUE} value
  • + *
  • {@link #MAPVALUE} typed value
  • *
  • {@link #MULTI} of these types
  • . * * diff --git a/src/main/java/com/lambdaworks/redis/output/IntegerOutput.java b/src/main/java/com/lambdaworks/redis/output/IntegerOutput.java index f1b97a4d8..8b17f0f41 100644 --- a/src/main/java/com/lambdaworks/redis/output/IntegerOutput.java +++ b/src/main/java/com/lambdaworks/redis/output/IntegerOutput.java @@ -5,7 +5,9 @@ package com.lambdaworks.redis.output; import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.protocol.CommandOutput; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; /** * 64-bit integer output, may be null. @@ -24,6 +26,6 @@ public class IntegerOutput extends CommandOutput { @Override public void set(ByteBuffer bytes) { - output = null; + output = bytes == null ? null : new Long(decodeAscii(bytes)); } } diff --git a/src/main/java/com/lambdaworks/redis/output/MapScanOutput.java b/src/main/java/com/lambdaworks/redis/output/MapScanOutput.java index 1294a98f2..90d62d421 100644 --- a/src/main/java/com/lambdaworks/redis/output/MapScanOutput.java +++ b/src/main/java/com/lambdaworks/redis/output/MapScanOutput.java @@ -16,7 +16,7 @@ public class MapScanOutput extends CommandOutput @Override public void set(ByteBuffer bytes) { if (output.getPos() == null) { - output.setPos(((Number) codec.decodeValue(bytes)).longValue()); + output.setPos(toLong(bytes)); } else { if (counter % 2 == 0) { output.addValue(codec.decodeMapValue(bytes)); @@ -27,4 +27,9 @@ public class MapScanOutput extends CommandOutput counter++; } + private Long toLong(ByteBuffer bytes) { + return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit())); + } + + } diff --git a/src/main/java/com/lambdaworks/redis/output/ValueSetScanOutput.java b/src/main/java/com/lambdaworks/redis/output/ValueSetScanOutput.java index abdf76067..65f1bb034 100644 --- a/src/main/java/com/lambdaworks/redis/output/ValueSetScanOutput.java +++ b/src/main/java/com/lambdaworks/redis/output/ValueSetScanOutput.java @@ -14,10 +14,14 @@ public class ValueSetScanOutput extends CommandOutput extends ChannelDuplexHandler { ByteBuf buf = ctx.alloc().heapBuffer(); cmd.encode(buf); // System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); +// System.out.println("out: " + toHexString(buf)); ctx.write(buf, promise); } + private String toHexString(ByteBuf buf) { + final StringBuilder builder = new StringBuilder(buf.readableBytes() * 2); + buf.forEachByte(new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + char b = (char) value; + if ((b < ' ' && b != '\n' && b != '\r') || b > '~') { + builder.append("\\x").append(StringUtil.byteToHexStringPadded(value)); + } else { + builder.append(b); + } + return true; + } + }); + return builder.toString(); + } + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { while (true) { Command cmd = queue.peek(); diff --git a/src/main/java/org/redisson/RedissonAtomicLong.java b/src/main/java/org/redisson/RedissonAtomicLong.java index 71bf1ae57..08967591d 100644 --- a/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/src/main/java/org/redisson/RedissonAtomicLong.java @@ -24,6 +24,11 @@ import org.redisson.core.RAtomicLong; import com.lambdaworks.redis.RedisAsyncConnection; import com.lambdaworks.redis.RedisConnection; +import org.redisson.core.RScript; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; /** * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} @@ -49,26 +54,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public boolean compareAndSet(final long expect, final long update) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Boolean execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - - Long value = getLongSafe(conn); - if (value != expect) { - conn.unwatch(); - return false; - } - - conn.multi(); - conn.set(getName(), update); - if (conn.exec().size() == 1) { - return true; - } - } - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager).evalR( + "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('set', KEYS[1], ARGV[2]); return true else return false end", + RScript.ReturnType.BOOLEAN, + keys, Collections.EMPTY_LIST, Arrays.asList(expect, update)); } @Override @@ -88,51 +79,22 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public long getAndAdd(final long delta) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Long execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - - Long value = getLongSafe(conn); - - conn.multi(); - conn.set(getName(), value + delta); - if (conn.exec().size() == 1) { - return value; - } - } - } - - }); - } - - private Long getLongSafe(RedisConnection conn) { - Number n = (Number) conn.get(getName()); - if (n != null) { - return n.longValue(); - } - return 0L; + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager).evalR( + "local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], v + ARGV[1]); return tonumber(v)", + RScript.ReturnType.INTEGER, + keys, Collections.EMPTY_LIST, Collections.singletonList(delta)); } @Override public long getAndSet(final long newValue) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Long execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - - Long value = getLongSafe(conn); - - conn.multi(); - conn.set(getName(), newValue); - if (conn.exec().size() == 1) { - return value; - } - } - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager).evalR( + "local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)", + RScript.ReturnType.INTEGER, + keys, Collections.EMPTY_LIST, Collections.singletonList(newValue)); } @Override @@ -156,12 +118,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public void set(final long newValue) { - connectionManager.write(getName(), new ResultOperation() { - @Override - protected Future execute(RedisAsyncConnection async) { - return async.set(getName(), newValue); - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + new RedissonScript(connectionManager).evalR( + "redis.call('set', KEYS[1], ARGV[1])", + RScript.ReturnType.STATUS, + keys, Collections.EMPTY_LIST, Collections.singletonList(newValue)); } public String toString() { diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index 665e8a66a..870f6b091 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -28,6 +29,7 @@ import org.redisson.connection.ConnectionManager; import org.redisson.core.RBlockingQueue; import com.lambdaworks.redis.RedisConnection; +import org.redisson.core.RScript; /** * Offers blocking queue facilities through an intermediary @@ -97,49 +99,39 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public int drainTo(Collection c) { - List list = connectionManager.write(getName(), new SyncOperation>() { - @Override - public List execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - conn.multi(); - conn.lrange(getName(), 0, -1); - conn.ltrim(getName(), 0, -1); - List res = conn.exec(); - if (res.size() == 2) { - List items = (List) res.get(0); - return items; - } - } - } - }); + if (c == null) { + throw new NullPointerException(); + } + ArrayList keys = new ArrayList(); + keys.add(getName()); + List list = new RedissonScript(connectionManager).eval( + "local vals = redis.call('lrange', KEYS[1], 0, -1); " + + "redis.call('ltrim', KEYS[1], -1, 0); " + + "return vals", + RScript.ReturnType.MAPVALUELIST, + keys); c.addAll(list); return list.size(); } @Override public int drainTo(Collection c, final int maxElements) { - List list = connectionManager.write(getName(), new SyncOperation>() { - @Override - public List execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - Long len = Math.min(conn.llen(getName()), maxElements); - if (len == 0) { - conn.unwatch(); - return Collections.emptyList(); - } - conn.multi(); - conn.lrange(getName(), 0, len - 1); - conn.ltrim(getName(), len, -1); - List res = conn.exec(); - if (res.size() == 2) { - List items = (List) res.get(0); - return items; - } - } - } - }); + if (maxElements <= 0) { + return 0; + } + if (c == null) { + throw new NullPointerException(); + } + + ArrayList keys = new ArrayList(); + keys.add(getName()); + List list = new RedissonScript(connectionManager).evalR( + "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + + "return vals", + RScript.ReturnType.MAPVALUELIST, + keys, Collections.emptyList(), Collections.singletonList(maxElements)); c.addAll(list); return list.size(); } diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index bbb10f203..c748e5f10 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -15,23 +15,21 @@ */ package org.redisson; +import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import org.redisson.connection.ConnectionManager; +import org.redisson.core.RCountDownLatch; +import org.redisson.core.RScript; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.redisson.async.ResultOperation; -import org.redisson.async.SyncOperation; -import org.redisson.connection.ConnectionManager; -import org.redisson.core.RCountDownLatch; - -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisConnection; -import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; - /** * Distributed alternative to the {@link java.util.concurrent.CountDownLatch} * @@ -195,24 +193,15 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown if (getCount() <= 0) { return; } - - connectionManager.write(getName(), new SyncOperation() { - @Override - public Void execute(RedisConnection conn) { - Long val = conn.decr(getName()); - if (val == 0) { - conn.multi(); - conn.del(getName()); - conn.publish(getChannelName(), zeroCountMessage); - if (conn.exec().size() != 2) { - throw new IllegalStateException(); - } - } else if (val < 0) { - conn.del(getName()); - } - return null; - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + new RedissonScript(connectionManager).evalR( + "local v = redis.call('decr', KEYS[1]);" + + "if v <= 0 then redis.call('del', KEYS[1]) end;" + + "if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" + + "return 'OK'", + RScript.ReturnType.STATUS, + keys, Collections.singletonList(zeroCountMessage), Collections.singletonList(getChannelName())); } private String getEntryName() { @@ -229,53 +218,42 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } private long getCountInner() { - Number val = connectionManager.read(getName(), new ResultOperation() { - @Override - protected Future execute(RedisAsyncConnection async) { - return async.get(getName()); - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + Long val = new RedissonScript(connectionManager).eval( + "return redis.call('get', KEYS[1])", + RScript.ReturnType.INTEGER, + keys); + if (val == null) { return 0; } - return val.longValue(); + return val; } @Override public boolean trySetCount(final long count) { - return connectionManager.write(getName(), new SyncOperation() { - - @Override - public Boolean execute(RedisConnection conn) { - conn.watch(getName()); - Number oldValue = (Number) conn.get(getName()); - if (oldValue != null) { - conn.unwatch(); - return false; - } - conn.multi(); - conn.set(getName(), count); - conn.publish(getChannelName(), newCountMessage); - return conn.exec().size() == 2; - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager).evalR( + "if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end", + RScript.ReturnType.BOOLEAN, + keys, Collections.singletonList(newCountMessage), Arrays.asList(count, getChannelName())); } @Override public boolean delete() { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Boolean execute(RedisConnection conn) { - conn.multi(); - conn.del(getName()); - conn.publish(getChannelName(), zeroCountMessage); - if (conn.exec().size() != 2) { - throw new IllegalStateException(); - } - return true; - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + Boolean deleted = new RedissonScript(connectionManager).evalR( + "if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end", + RScript.ReturnType.BOOLEAN, + keys, Collections.singletonList(newCountMessage), Collections.singletonList(getChannelName())); + if (!deleted) { + throw new IllegalStateException(); + } + return true; } } diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index a2fa70cd4..5d69cb590 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -15,26 +15,19 @@ */ package org.redisson; +import com.lambdaworks.redis.RedisAsyncConnection; +import com.lambdaworks.redis.RedisConnection; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; - import org.redisson.async.AsyncOperation; import org.redisson.async.OperationListener; import org.redisson.async.ResultOperation; import org.redisson.async.SyncOperation; import org.redisson.connection.ConnectionManager; import org.redisson.core.RList; +import org.redisson.core.RScript; -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisConnection; +import java.util.*; /** * Distributed and concurrent implementation of {@link java.util.List} @@ -163,7 +156,7 @@ public class RedissonList extends RedissonExpirable implements RList { return connectionManager.writeAsync(getName(), new AsyncOperation() { @Override public void execute(final Promise promise, RedisAsyncConnection async) { - async.rpush((Object)getName(), c.toArray()).addListener(new OperationListener(promise, async, this) { + async.rpush((Object) getName(), c.toArray()).addListener(new OperationListener(promise, async, this) { @Override public void onOperationComplete(Future future) throws Exception { promise.setSuccess(true); @@ -180,34 +173,35 @@ public class RedissonList extends RedissonExpirable implements RList { return false; } if (index < size()) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Boolean execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - List tail = conn.lrange(getName(), index, size()); - - int first = 0; - int last = 0; - if (index == 0) { - first = size();// truncate the list - last = 0; - } else { - first = 0; - last = index - 1; - } - - conn.multi(); - conn.ltrim(getName(), first, last); - conn.rpush(getName(), coll.toArray()); - conn.rpush(getName(), tail.toArray()); - if (conn.exec().size() == 3) { - return true; - } + + if (index == 0) { // prepend elements to list + final ArrayList elemens = new ArrayList(coll); + Collections.reverse(elemens); + return connectionManager.write(getName(), new SyncOperation() { + @Override + public Boolean execute(RedisConnection conn) { + conn.lpush(getName(), elemens.toArray()); + return true; } - } - }); + }); + } + + // insert into middle of list + + ArrayList keys = new ArrayList(); + keys.add(getName()); + + return "OK".equals(new RedissonScript(connectionManager).evalR( + "local ind = table.remove(ARGV); " + // index is last parameter + "local tail = redis.call('lrange', KEYS[1], ind, -1); " + + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + + "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + + "return 'OK'", + RScript.ReturnType.STATUS, + keys, new ArrayList(coll), Collections.singletonList(index))); } else { + // append to list return addAll(coll); } } @@ -302,21 +296,17 @@ public class RedissonList extends RedissonExpirable implements RList { public V set(final int index, final V element) { checkIndex(index); - return connectionManager.write(getName(), new SyncOperation() { - @Override - public V execute(RedisConnection conn) { - while (true) { - conn.watch(getName()); - V prev = (V) conn.lindex(getName(), index); - - conn.multi(); - conn.lset(getName(), index, element); - if (conn.exec().size() == 1) { - return prev; - } - } - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + + return new RedissonScript(connectionManager).evalR( + "local v = redis.call('lindex', KEYS[1], ARGV[2]); " + + "redis.call('lset', KEYS[1], ARGV[2], ARGV[1]); " + + "return v", + RScript.ReturnType.VALUE, + keys, Collections.singletonList(element), Collections.singletonList(index) + + ); } @Override @@ -339,26 +329,25 @@ public class RedissonList extends RedissonExpirable implements RList { public V remove(final int index) { checkIndex(index); - return connectionManager.write(getName(), new SyncOperation() { - @Override - public V execute(RedisConnection conn) { - if (index == 0) { + if (index == 0) { + return connectionManager.write(getName(), new SyncOperation() { + @Override + public V execute(RedisConnection conn) { return (V) conn.lpop(getName()); } - while (true) { - conn.watch(getName()); - V prev = (V) conn.lindex(getName(), index); - List tail = conn.lrange(getName(), index + 1, size()); - - conn.multi(); - conn.ltrim(getName(), 0, index - 1); - conn.rpush(getName(), tail.toArray()); - if (conn.exec().size() == 2) { - return prev; - } - } - } - }); + }); + } + // else + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager).evalR( + "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + + "local tail = redis.call('lrange', KEYS[1], ARGV[1]);" + + "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + + "return v", + RScript.ReturnType.VALUE, + keys, Collections.emptyList(), Collections.singletonList(index)); } @Override @@ -367,22 +356,15 @@ public class RedissonList extends RedissonExpirable implements RList { return -1; } - int to = div(size(), batchSize); - for (int i = 0; i < to; i++) { - final int j = i; - List range = connectionManager.read(getName(), new ResultOperation, Object>() { - @Override - protected Future> execute(RedisAsyncConnection async) { - return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1); - } - }); - int index = range.indexOf(o); - if (index != -1) { - return index + i*batchSize; - } - } - - return -1; + ArrayList keys = new ArrayList(); + keys.add(getName()); + Long index = new RedissonScript(connectionManager).eval( + "local s = redis.call('llen', KEYS[1]);" + + "for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + + "return -1", + RScript.ReturnType.INTEGER, + keys, o); + return index.intValue(); } @Override @@ -391,24 +373,15 @@ public class RedissonList extends RedissonExpirable implements RList { return -1; } - final int size = size(); - int to = div(size, batchSize); - for (int i = 1; i <= to; i++) { - final int j = i; - final int startIndex = -i*batchSize; - List range = connectionManager.read(getName(), new ResultOperation, Object>() { - @Override - protected Future> execute(RedisAsyncConnection async) { - return async.lrange(getName(), startIndex, size - (j-1)*batchSize); - } - }); - int index = range.lastIndexOf(o); - if (index != -1) { - return Math.max(size + startIndex, 0) + index; - } - } - - return -1; + ArrayList keys = new ArrayList(); + keys.add(getName()); + Long index = new RedissonScript(connectionManager).eval( + "local s = redis.call('llen', KEYS[1]);" + + "for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + + "return -1", + RScript.ReturnType.INTEGER, + keys, o); + return index.intValue(); } @Override diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index eed005e6f..fcf7ffff5 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -15,97 +15,39 @@ */ package org.redisson; +import com.lambdaworks.redis.RedisConnection; +import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import org.redisson.async.SyncOperation; +import org.redisson.connection.ConnectionManager; +import org.redisson.core.RLock; +import org.redisson.core.RScript; -import java.io.Serializable; -import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; -import org.redisson.async.ResultOperation; -import org.redisson.async.SyncOperation; -import org.redisson.connection.ConnectionManager; -import org.redisson.core.RLock; - -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisConnection; -import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; - /** * Distributed implementation of {@link java.util.concurrent.locks.Lock} - * Implements reentrant lock. + * Implements reentrant lock.
    + * Lock will be removed automatically if client disconnects. * * @author Nikita Koksharov * */ -public class RedissonLock extends RedissonObject implements RLock { - - public static class LockValue implements Serializable { - - private static final long serialVersionUID = -8895632286065689476L; - - private UUID id; - private Long threadId; - // need for reentrant support - private int counter; +public class RedissonLock extends RedissonExpirable implements RLock { - public LockValue() { - } - - public LockValue(UUID id, Long threadId) { - super(); - this.id = id; - this.threadId = threadId; - } - - public void decCounter() { - counter--; - } - - public void incCounter() { - counter++; - } - - public int getCounter() { - return counter; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((id == null) ? 0 : id.hashCode()); - result = prime * result + ((threadId == null) ? 0 : threadId.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - LockValue other = (LockValue) obj; - if (id == null) { - if (other.id != null) - return false; - } else if (!id.equals(other.id)) - return false; - if (threadId == null) { - if (other.threadId != null) - return false; - } else if (!threadId.equals(other.threadId)) - return false; - return true; - } - - } + public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30; + private static final ConcurrentMap refreshTaskMap = new ConcurrentHashMap(); + protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS); private final UUID id; @@ -214,7 +156,6 @@ public class RedissonLock extends RedissonObject implements RLock { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return; } } @@ -224,7 +165,6 @@ public class RedissonLock extends RedissonObject implements RLock { lockInterruptibly(leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return; } } @@ -280,64 +220,65 @@ public class RedissonLock extends RedissonObject implements RLock { } private Long tryLockInner() { - final LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); - currentLock.incCounter(); - - return connectionManager.write(getName(), new SyncOperation() { + Long ttlRemaining = tryLockInner(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS); + if (ttlRemaining == null) { + newRefreshTask(); + } + return ttlRemaining; + } + private void newRefreshTask() { + if (refreshTaskMap.containsKey(getName())) { + return; + } + Timeout task = connectionManager.newTimeout(new TimerTask() { @Override - public Long execute(RedisConnection connection) { - Boolean res = connection.setnx(getName(), currentLock); - if (!res) { - connection.watch(getName()); - LockValue lock = (LockValue) connection.get(getName()); - if (lock != null && lock.equals(currentLock)) { - lock.incCounter(); - connection.multi(); - connection.set(getName(), lock); - if (connection.exec().size() == 1) { - return null; - } - } - connection.unwatch(); - - Long ttl = connection.pttl(getName()); - return ttl; - } - return null; + public void run(Timeout timeout) throws Exception { + expire(internalLockLeaseTime, TimeUnit.MILLISECONDS); + refreshTaskMap.remove(getName()); + newRefreshTask(); // reschedule itself } - }); + }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); + if (refreshTaskMap.putIfAbsent(getName(), task) != null) { + task.cancel(); + } } - private Long tryLockInner(final long leaseTime, final TimeUnit unit) { - final LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); - currentLock.incCounter(); + /** + * Stop refresh timer + * @return true if timer was stopped successfully + */ + private boolean stopRefreshTask() { + boolean returnValue =false; + Timeout task = refreshTaskMap.get(getName()); + if (task != null) { + returnValue = task.cancel(); + refreshTaskMap.remove(getName()); + } + return returnValue; + } - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Long execute(RedisConnection connection) { - long time = unit.toMillis(leaseTime); - String res = connection.setexnx(getName(), currentLock, time); - if ("OK".equals(res)) { - return null; - } else { - connection.watch(getName()); - LockValue lock = (LockValue) connection.get(getName()); - if (lock != null && lock.equals(currentLock)) { - lock.incCounter(); - connection.multi(); - connection.psetex(getName(), time, lock); - if (connection.exec().size() == 1) { - return null; - } - } - connection.unwatch(); - Long ttl = connection.pttl(getName()); - return ttl; - } - } - }); + private Long tryLockInner(final long leaseTime, final TimeUnit unit) { + internalLockLeaseTime = unit.toMillis(leaseTime); + + ArrayList keys = new ArrayList(); + keys.add(getName()); + return new RedissonScript(connectionManager) + .evalR("local v = redis.call('get', KEYS[1]); " + + "if (v == false) then " + + " redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " + + " return nil; " + + "else " + + " local o = cjson.decode(v); " + + " if (o['o'] == ARGV[1]) then " + + " o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " + + " return nil; " + + " end;" + + " return redis.call('pttl', KEYS[1]); " + + "end", + RScript.ReturnType.INTEGER, + keys, Collections.singletonList(id.toString() + "-" + Thread.currentThread().getId()), Collections.singletonList(internalLockLeaseTime)); } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { @@ -399,45 +340,39 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void unlock() { - connectionManager.write(getName(), new SyncOperation() { - @Override - public Void execute(RedisConnection connection) { - LockValue lock = (LockValue) connection.get(getName()); - if (lock != null) { - LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); - if (lock.equals(currentLock)) { - if (lock.getCounter() > 1) { - lock.decCounter(); - connection.set(getName(), lock); - } else { - unlock(connection); - } - } else { - throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: " - + id + " thread-id: " + Thread.currentThread().getId()); - } - } else { - // could be deleted - } - return null; - } - }); - } - - private void unlock(RedisConnection connection) { - int counter = 0; - while (counter < 5) { - connection.multi(); - connection.del(getName()); - connection.publish(getChannelName(), unlockMessage); - List res = connection.exec(); - if (res.size() == 2) { - return; - } - counter++; + ArrayList keys = new ArrayList(); + keys.add(getName()); + String opStatus = new RedissonScript(connectionManager) + .evalR("local v = redis.call('get', KEYS[1]); " + + "if (v == false) then " + + " redis.call('publish', ARGV[4], ARGV[2]); " + + " return 'OK'; " + + "else " + + " local o = cjson.decode(v); " + + " if (o['o'] == ARGV[1]) then " + + " o['c'] = o['c'] - 1; " + + " if (o['c'] > 0) then " + + " redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); " + + " return 'FALSE';"+ + " else " + + " redis.call('del', KEYS[1]);" + + " redis.call('publish', ARGV[4], ARGV[2]); " + + " return 'OK';"+ + " end" + + " end;" + + " return nil; " + + "end", + RScript.ReturnType.STATUS, + keys, Arrays.asList(id.toString() + "-" + Thread.currentThread().getId(), unlockMessage), Arrays.asList(internalLockLeaseTime, getChannelName())); + if ("OK".equals(opStatus)) { + stopRefreshTask(); + } else if ("FALSE".equals(opStatus)) { + //do nothing + } else { + throw new IllegalStateException("Can't unlock lock Current id: " + + id + " thread-id: " + Thread.currentThread().getId()); } - throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: " - + id + " thread-id: " + Thread.currentThread().getId()); + } @Override @@ -448,45 +383,61 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void forceUnlock() { - connectionManager.write(getName(), new SyncOperation() { - @Override - public Void execute(RedisConnection connection) { - unlock(connection); - return null; - } - }); + ArrayList keys = new ArrayList(); + keys.add(getName()); + stopRefreshTask(); + new RedissonScript(connectionManager) + .evalR("redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return 'OK'", + RScript.ReturnType.STATUS, + keys, Collections.singletonList(unlockMessage), Collections.singletonList(getChannelName())); } @Override public boolean isLocked() { - return getCurrentLock() != null; - } - - private LockValue getCurrentLock() { - LockValue lock = connectionManager.read(getName(), new ResultOperation() { + return connectionManager.read(new SyncOperation() { @Override - protected Future execute(RedisAsyncConnection async) { - return async.get(getName()); + public Boolean execute(RedisConnection conn) { + return conn.exists(getName()); } }); - return lock; } @Override public boolean isHeldByCurrentThread() { - LockValue lock = getCurrentLock(); - LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); - return lock != null && lock.equals(currentLock); + ArrayList keys = new ArrayList(); + keys.add(getName()); + String opStatus = new RedissonScript(connectionManager) + .eval("local v = redis.call('get', KEYS[1]); " + + "if (v == false) then " + + " return nil; " + + "else " + + " local o = cjson.decode(v); " + + " if (o['o'] == ARGV[1]) then " + + " return 'OK'; " + + " else" + + " return nil; " + + " end;" + + "end", + RScript.ReturnType.STATUS, + keys, id.toString() + "-" + Thread.currentThread().getId()); + return "OK".equals(opStatus); } @Override public int getHoldCount() { - LockValue lock = getCurrentLock(); - LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); - if (lock != null && lock.equals(currentLock)) { - return lock.getCounter(); - } - return 0; + ArrayList keys = new ArrayList(); + keys.add(getName()); + Long opStatus = new RedissonScript(connectionManager) + .eval("local v = redis.call('get', KEYS[1]); " + + "if (v == false) then " + + " return 0; " + + "else " + + " local o = cjson.decode(v); " + + " return o['c']; " + + "end", + RScript.ReturnType.INTEGER, + keys, id.toString() + "-" + Thread.currentThread().getId()); + return opStatus.intValue(); } @Override diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 2b7367afb..4610e92ab 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -15,16 +15,14 @@ */ package org.redisson; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + import java.math.BigDecimal; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; +import java.util.*; +import org.redisson.async.AsyncOperation; +import org.redisson.async.OperationListener; import org.redisson.async.ResultOperation; import org.redisson.connection.ConnectionManager; import org.redisson.core.Predicate; @@ -32,6 +30,7 @@ import org.redisson.core.RMap; import org.redisson.core.RScript; import com.lambdaworks.redis.RedisAsyncConnection; +import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.output.MapScanResult; import io.netty.util.concurrent.Future; @@ -129,6 +128,9 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public void putAll(final Map map) { + if (map.size() == 0) { + return; + } connectionManager.write(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { diff --git a/src/main/java/org/redisson/RedissonScript.java b/src/main/java/org/redisson/RedissonScript.java index 24ae83580..da25200b5 100644 --- a/src/main/java/org/redisson/RedissonScript.java +++ b/src/main/java/org/redisson/RedissonScript.java @@ -70,6 +70,21 @@ public class RedissonScript implements RScript { }); } + @Override + public R evalR(String luaScript, ReturnType returnType, List keys, List values, List rawValues) { + return (R) connectionManager.get(evalAsyncR(luaScript, returnType, keys, values, rawValues)); + } + + @Override + public Future evalAsyncR(final String luaScript, final ReturnType returnType, final List keys, final List values, final List rawValues) { + return connectionManager.writeAsync(new ResultOperation() { + @Override + protected Future execute(RedisAsyncConnection async) { + return async.evalR(luaScript, ScriptOutputType.valueOf(returnType.toString()), keys, values, rawValues); + } + }); + } + @Override public R evalSha(String shaDigest, ReturnType returnType) { return evalSha(shaDigest, returnType, Collections.emptyList()); diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index 9e1fde9a5..66f2478e5 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -18,10 +18,7 @@ package org.redisson; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Set; +import java.util.*; import org.redisson.async.AsyncOperation; import org.redisson.async.OperationListener; @@ -121,7 +118,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { } // lazy init iterator - hasNext(); +// hasNext(); iter.remove(); RedissonSet.this.remove(value); removeExecuted = true; @@ -207,7 +204,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { if (c.isEmpty()) { return false; } - + Long res = connectionManager.write(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { @@ -219,14 +216,13 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean retainAll(Collection c) { - boolean changed = false; - for (Object object : this) { + List toRemove = new ArrayList(); + for (V object : this) { if (!c.contains(object)) { - remove(object); - changed = true; + toRemove.add(object); } } - return changed; + return removeAll(toRemove); } @Override diff --git a/src/main/java/org/redisson/codec/SerializationCodec.java b/src/main/java/org/redisson/codec/SerializationCodec.java index c24735a9a..1aeac3901 100644 --- a/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/src/main/java/org/redisson/codec/SerializationCodec.java @@ -15,11 +15,9 @@ */ package org.redisson.codec; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.*; import java.nio.ByteBuffer; +import java.nio.charset.Charset; /** * @@ -30,7 +28,7 @@ public class SerializationCodec implements RedissonCodec { @Override public Object decodeKey(ByteBuffer bytes) { - return decode(bytes); + return new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit(), Charset.forName("ASCII")); } @Override @@ -50,7 +48,7 @@ public class SerializationCodec implements RedissonCodec { @Override public byte[] encodeKey(Object key) { - return encodeValue(key); + return key.toString().getBytes(Charset.forName("ASCII")); } @Override @@ -73,7 +71,7 @@ public class SerializationCodec implements RedissonCodec { @Override public byte[] encodeMapKey(Object key) { - return encodeKey(key); + return encodeValue(key); } @Override @@ -83,7 +81,18 @@ public class SerializationCodec implements RedissonCodec { @Override public Object decodeMapKey(ByteBuffer bytes) { - return decodeKey(bytes); + return decodeValue(bytes); + } + + protected String decodeAscii(ByteBuffer bytes) { + if (bytes == null) { + return null; + } + char[] chars = new char[bytes.remaining()]; + for (int i = 0; i < chars.length; i++) { + chars[i] = (char) bytes.get(); + } + return new String(chars); } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 5e89d2ebb..aeef26918 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -16,6 +16,8 @@ package org.redisson.connection; import io.netty.channel.EventLoopGroup; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import org.redisson.async.AsyncOperation; @@ -26,6 +28,8 @@ import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; +import java.util.concurrent.TimeUnit; + /** * * @author Nikita Koksharov @@ -86,4 +90,5 @@ public interface ConnectionManager { EventLoopGroup getGroup(); + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index aa5e88713..8858160c3 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -735,4 +735,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return group; } + @Override + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + return timer.newTimeout(task, delay, unit); + } + } diff --git a/src/main/java/org/redisson/core/RScript.java b/src/main/java/org/redisson/core/RScript.java index e2a27f220..321d784de 100644 --- a/src/main/java/org/redisson/core/RScript.java +++ b/src/main/java/org/redisson/core/RScript.java @@ -21,7 +21,7 @@ import java.util.List; public interface RScript { - enum ReturnType {BOOLEAN, INTEGER, MULTI, STATUS, VALUE}; + enum ReturnType {BOOLEAN, INTEGER, MULTI, STATUS, VALUE, MAPVALUE, MAPVALUELIST}; List scriptExists(String ... shaDigests); @@ -38,7 +38,11 @@ public interface RScript { String scriptLoad(String luaScript); Future scriptLoadAsync(String luaScript); - + + R evalR(String luaScript, ReturnType returnType, List keys, List values, List rawValues); + + Future evalAsyncR(String luaScript, ReturnType returnType, List keys, List values, List rawValues); + R evalSha(String shaDigest, ReturnType returnType); R evalSha(String shaDigest, ReturnType returnType, List keys, Object... values); diff --git a/src/test/java/org/redisson/BaseConcurrentTest.java b/src/test/java/org/redisson/BaseConcurrentTest.java index a803f8d43..6d3f0fbc8 100644 --- a/src/test/java/org/redisson/BaseConcurrentTest.java +++ b/src/test/java/org/redisson/BaseConcurrentTest.java @@ -11,11 +11,11 @@ import java.util.concurrent.TimeUnit; public abstract class BaseConcurrentTest { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); + ExecutorService executor = Executors.newCachedThreadPool(); final Map instances = new HashMap(); for (int i = 0; i < iterations; i++) { - instances.put(i, Redisson.create()); + instances.put(i, BaseTest.createInstance()); } long watch = System.currentTimeMillis(); @@ -35,7 +35,7 @@ public abstract class BaseConcurrentTest { System.out.println("multi: " + (System.currentTimeMillis() - watch)); - executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + executor = Executors.newCachedThreadPool(); for (final Redisson redisson : instances.values()) { executor.execute(new Runnable() { @@ -53,7 +53,7 @@ public abstract class BaseConcurrentTest { protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - final Redisson redisson = Redisson.create(); + final Redisson redisson = BaseTest.createInstance(); long watch = System.currentTimeMillis(); for (int i = 0; i < iterations; i++) { executor.execute(new Runnable() { diff --git a/src/test/java/org/redisson/BaseTest.java b/src/test/java/org/redisson/BaseTest.java index b2a14b470..110cf3b2b 100644 --- a/src/test/java/org/redisson/BaseTest.java +++ b/src/test/java/org/redisson/BaseTest.java @@ -6,6 +6,7 @@ import java.util.Map; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.redisson.codec.SerializationCodec; public abstract class BaseTest { @@ -13,7 +14,18 @@ public abstract class BaseTest { @Before public void before() { - redisson = Redisson.create(); + this.redisson = createInstance(); + } + + public static Redisson createInstance() { + String redisAddress = System.getProperty("redisAddress"); + if (redisAddress == null) { + redisAddress = "127.0.0.1:6379"; + } + Config config = new Config(); + config.useSingleServer().setAddress(redisAddress); +// config.setCodec(new SerializationCodec()); + return Redisson.create(config); } @After diff --git a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java index 14642c45c..ddafb7322 100644 --- a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java +++ b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java @@ -17,7 +17,7 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest { public void testAdd_SingleInstance() throws InterruptedException { final String name = "testAdd_SingleInstance"; - Redisson r = Redisson.create(); + Redisson r = BaseTest.createInstance(); RSortedSet map = r.getSortedSet(name); map.clear(); @@ -54,7 +54,7 @@ public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest { public void testAddNegative_SingleInstance() throws InterruptedException { final String name = "testAddNegative_SingleInstance"; - Redisson r = Redisson.create(); + Redisson r = BaseTest.createInstance(); RSortedSet map = r.getSortedSet(name); map.clear(); diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 455541695..040cd1ff7 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -152,4 +152,36 @@ public class RedissonBlockingQueueTest extends BaseTest { assertThat(counter.get(), equalTo(total)); queue.delete(); } + + @Test + public void testDrainToCollection() throws Exception { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue1"); + queue1.put(1); + queue1.put(2L); + queue1.put("e"); + + ArrayList dst = new ArrayList(); + queue1.drainTo(dst); + MatcherAssert.assertThat(dst, Matchers.contains(1, 2L, "e")); + Assert.assertEquals(0, queue1.size()); + } + + @Test + public void testDrainToCollectionLimited() throws Exception { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue1"); + queue1.put(1); + queue1.put(2L); + queue1.put("e"); + + ArrayList dst = new ArrayList(); + queue1.drainTo(dst, 2); + MatcherAssert.assertThat(dst, Matchers.contains(1, 2L)); + Assert.assertEquals(1, queue1.size()); + + dst.clear(); + queue1.drainTo(dst, 2); + MatcherAssert.assertThat(dst, Matchers.contains("e")); + + + } } diff --git a/src/test/java/org/redisson/RedissonConcurrentMapTest.java b/src/test/java/org/redisson/RedissonConcurrentMapTest.java index be96a8163..0243e3a27 100644 --- a/src/test/java/org/redisson/RedissonConcurrentMapTest.java +++ b/src/test/java/org/redisson/RedissonConcurrentMapTest.java @@ -13,7 +13,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void testSingleReplaceOldValue_SingleInstance() throws InterruptedException { final String name = "testSingleReplaceOldValue_SingleInstance"; - ConcurrentMap map = Redisson.create().getMap(name); + ConcurrentMap map = BaseTest.createInstance().getMap(name); map.put("1", "122"); testSingleInstanceConcurrency(100, new RedissonRunnable() { @@ -25,7 +25,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { } }); - ConcurrentMap testMap = Redisson.create().getMap(name); + ConcurrentMap testMap = BaseTest.createInstance().getMap(name); Assert.assertEquals("32", testMap.get("1")); assertMapSize(1, name); @@ -35,7 +35,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void testSingleRemoveValue_SingleInstance() throws InterruptedException { final String name = "testSingleRemoveValue_SingleInstance"; - ConcurrentMap map = Redisson.create().getMap(name); + ConcurrentMap map = BaseTest.createInstance().getMap(name); map.putIfAbsent("1", "0"); testSingleInstanceConcurrency(100, new RedissonRunnable() { @Override @@ -52,7 +52,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void testSingleReplace_SingleInstance() throws InterruptedException { final String name = "testSingleReplace_SingleInstance"; - ConcurrentMap map = Redisson.create().getMap(name); + ConcurrentMap map = BaseTest.createInstance().getMap(name); map.put("1", "0"); testSingleInstanceConcurrency(100, new RedissonRunnable() { @@ -63,7 +63,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { } }); - ConcurrentMap testMap = Redisson.create().getMap(name); + ConcurrentMap testMap = BaseTest.createInstance().getMap(name); Assert.assertEquals("3", testMap.get("1")); assertMapSize(1, name); @@ -73,7 +73,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void test_Multi_Replace_MultiInstance() throws InterruptedException { final String name = "test_Multi_Replace_MultiInstance"; - Redisson redisson = Redisson.create(); + Redisson redisson = BaseTest.createInstance(); ConcurrentMap map = redisson.getMap(name); for (int i = 0; i < 5; i++) { map.put(i, 1); @@ -88,7 +88,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { } }); - ConcurrentMap testMap = Redisson.create().getMap(name); + ConcurrentMap testMap = BaseTest.createInstance().getMap(name); for (Integer value : testMap.values()) { Assert.assertEquals(2, (int)value); } @@ -102,7 +102,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void test_Multi_RemoveValue_MultiInstance() throws InterruptedException { final String name = "test_Multi_RemoveValue_MultiInstance"; - ConcurrentMap map = Redisson.create().getMap(name); + ConcurrentMap map = BaseTest.createInstance().getMap(name); for (int i = 0; i < 10; i++) { map.put(i, 1); } @@ -123,7 +123,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { public void testSinglePutIfAbsent_SingleInstance() throws InterruptedException { final String name = "testSinglePutIfAbsent_SingleInstance"; - ConcurrentMap map = Redisson.create().getMap(name); + ConcurrentMap map = BaseTest.createInstance().getMap(name); map.putIfAbsent("1", "0"); testSingleInstanceConcurrency(100, new RedissonRunnable() { @Override @@ -133,7 +133,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { } }); - ConcurrentMap testMap = Redisson.create().getMap(name); + ConcurrentMap testMap = BaseTest.createInstance().getMap(name); Assert.assertEquals("0", testMap.get("1")); assertMapSize(1, name); @@ -168,7 +168,7 @@ public class RedissonConcurrentMapTest extends BaseConcurrentTest { } private void assertMapSize(int size, String name) { - Map map = Redisson.create().getMap(name); + Map map = BaseTest.createInstance().getMap(name); Assert.assertEquals(size, map.size()); clear(map); } diff --git a/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java b/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java index 632960c72..f319f7f24 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchConcurrentTest.java @@ -15,7 +15,7 @@ public class RedissonCountDownLatchConcurrentTest { public void testSingleCountDownAwait_SingleInstance() throws InterruptedException { final int iterations = Runtime.getRuntime().availableProcessors()*3; - Redisson redisson = Redisson.create(); + Redisson redisson = BaseTest.createInstance(); final RCountDownLatch latch = redisson.getCountDownLatch("latch"); latch.trySetCount(iterations); diff --git a/src/test/java/org/redisson/RedissonCountDownLatchTest.java b/src/test/java/org/redisson/RedissonCountDownLatchTest.java index 841aa3aa2..5463e2db9 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchTest.java @@ -8,12 +8,10 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.core.RCountDownLatch; -public class RedissonCountDownLatchTest { +public class RedissonCountDownLatchTest extends BaseTest { @Test public void testAwaitTimeout() throws InterruptedException { - Redisson redisson = Redisson.create(); - ExecutorService executor = Executors.newFixedThreadPool(2); final RCountDownLatch latch = redisson.getCountDownLatch("latch1"); @@ -48,13 +46,10 @@ public class RedissonCountDownLatchTest { executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - redisson.shutdown(); } @Test public void testAwaitTimeoutFail() throws InterruptedException { - Redisson redisson = Redisson.create(); - ExecutorService executor = Executors.newFixedThreadPool(2); final RCountDownLatch latch = redisson.getCountDownLatch("latch1"); @@ -88,15 +83,14 @@ public class RedissonCountDownLatchTest { executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - - redisson.shutdown(); } @Test public void testCountDown() throws InterruptedException { - Redisson redisson = Redisson.create(); RCountDownLatch latch = redisson.getCountDownLatch("latch"); - latch.trySetCount(1); + latch.trySetCount(2); + Assert.assertEquals(2, latch.getCount()); + latch.countDown(); Assert.assertEquals(1, latch.getCount()); latch.countDown(); Assert.assertEquals(0, latch.getCount()); @@ -131,8 +125,25 @@ public class RedissonCountDownLatchTest { latch4.countDown(); Assert.assertEquals(0, latch.getCount()); latch4.await(); + } + + @Test + public void testDelete() throws Exception { + RCountDownLatch latch = redisson.getCountDownLatch("latch"); + latch.trySetCount(1); + Assert.assertTrue(latch.delete()); + } - redisson.shutdown(); + @Test(expected = IllegalStateException.class) + public void testDeleteFailed() throws Exception { + RCountDownLatch latch = redisson.getCountDownLatch("latch"); + Assert.assertTrue(latch.delete()); } + @Test + public void testTrySetCount() throws Exception { + RCountDownLatch latch = redisson.getCountDownLatch("latch"); + Assert.assertTrue(latch.trySetCount(1)); + Assert.assertFalse(latch.trySetCount(2)); + } } diff --git a/src/test/java/org/redisson/RedissonListTest.java b/src/test/java/org/redisson/RedissonListTest.java index 7bef9f096..f35b2e193 100644 --- a/src/test/java/org/redisson/RedissonListTest.java +++ b/src/test/java/org/redisson/RedissonListTest.java @@ -3,12 +3,7 @@ package org.redisson; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; +import java.util.*; import org.hamcrest.Matchers; import org.junit.Assert; @@ -38,7 +33,7 @@ public class RedissonListTest extends BaseTest { public void operationComplete(Future future) throws Exception { list.addAsync(2L); } - }); + }).awaitUninterruptibly(); Assert.assertThat(list, Matchers.contains(1L, 2L)); } @@ -465,6 +460,44 @@ public class RedissonListTest extends BaseTest { Assert.assertTrue(list.isEmpty()); } + @Test + public void testRetainAll() { + List list = redisson.getList("list"); + list.add(1); + list.add(2); + list.add(3); + list.add(4); + list.add(5); + + Assert.assertTrue(list.retainAll(Arrays.asList(3, 2, 10, 6))); + + Assert.assertThat(list, Matchers.contains(2, 3)); + Assert.assertEquals(2, list.size()); + } + + @Test + public void testRetainAllEmpty() { + List list = redisson.getList("list"); + list.add(1); + list.add(2); + list.add(3); + list.add(4); + list.add(5); + + Assert.assertTrue(list.retainAll(Collections.emptyList())); + Assert.assertEquals(0, list.size()); + } + + @Test + public void testRetainAllNoModify() { + List list = redisson.getList("list"); + list.add(1); + list.add(2); + + Assert.assertFalse(list.retainAll(Arrays.asList(1, 2))); // nothing changed + Assert.assertThat(list, Matchers.contains(1, 2)); + } + @Test public void testAddAllIndex() { @@ -487,7 +520,7 @@ public class RedissonListTest extends BaseTest { Assert.assertThat(list, Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); - list.addAll(0, Arrays.asList(6,7)); + list.addAll(0, Arrays.asList(6, 7)); Assert.assertThat(list, Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); } @@ -501,7 +534,7 @@ public class RedissonListTest extends BaseTest { list.add(4); list.add(5); - list.addAll(2, Arrays.asList(7,8,9)); + list.addAll(2, Arrays.asList(7, 8, 9)); list.addAll(list.size()-1, Arrays.asList(9, 1, 9)); @@ -526,13 +559,20 @@ public class RedissonListTest extends BaseTest { list.add(4); list.add(5); - list.addAll(Arrays.asList(7,8,9)); + Assert.assertTrue(list.addAll(Arrays.asList(7, 8, 9))); - list.addAll(Arrays.asList(9, 1, 9)); + Assert.assertTrue(list.addAll(Arrays.asList(9, 1, 9))); Assert.assertThat(list, Matchers.contains(1, 2, 3, 4, 5, 7, 8, 9, 9, 1, 9)); } + @Test + public void testAddAllEmpty() throws Exception { + List list = redisson.getList("list"); + Assert.assertFalse(list.addAll(Collections.emptyList())); + Assert.assertEquals(0, list.size()); + } + @Test public void testContainsAll() { List list = redisson.getList("list"); @@ -553,10 +593,10 @@ public class RedissonListTest extends BaseTest { list.add("5"); list.add("3"); - Assert.assertArrayEquals(list.toArray(), new Object[] {"1", "4", "2", "5", "3"}); + Assert.assertArrayEquals(list.toArray(), new Object[]{"1", "4", "2", "5", "3"}); String[] strs = list.toArray(new String[0]); - Assert.assertArrayEquals(strs, new String[] {"1", "4", "2", "5", "3"}); + Assert.assertArrayEquals(strs, new String[]{"1", "4", "2", "5", "3"}); } @@ -694,4 +734,14 @@ public class RedissonListTest extends BaseTest { Assert.assertThat(list, Matchers.contains("1", "3", "5", "6")); } + @Test + public void testCodec() { + List list = redisson.getList("list"); + list.add(1); + list.add(2L); + list.add("3"); + list.add("e"); + + Assert.assertThat(list, Matchers.contains(1, 2L, "3", "e")); + } } diff --git a/src/test/java/org/redisson/RedissonLockTest.java b/src/test/java/org/redisson/RedissonLockTest.java index 608fa2b94..90242073a 100644 --- a/src/test/java/org/redisson/RedissonLockTest.java +++ b/src/test/java/org/redisson/RedissonLockTest.java @@ -17,7 +17,7 @@ public class RedissonLockTest extends BaseConcurrentTest { @Before public void before() { - redisson = Redisson.create(); + redisson = BaseTest.createInstance(); } @After @@ -51,6 +51,23 @@ public class RedissonLockTest extends BaseConcurrentTest { lock.unlock(); } + @Test + public void testAutoExpire() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + testSingleInstanceConcurrency(1, new RedissonRunnable() { + @Override + public void run(Redisson redisson) { + RLock lock = redisson.getLock("lock"); + lock.lock(); + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); + RLock lock = redisson.getLock("lock"); + Thread.sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS + 1)); + Assert.assertFalse("Transient lock expired automatically", lock.isLocked()); + } @Test public void testGetHoldCount() { @@ -166,11 +183,22 @@ public class RedissonLockTest extends BaseConcurrentTest { } @Test - public void testReentrancy() { + public void testReentrancy() throws InterruptedException { Lock lock = redisson.getLock("lock1"); - lock.lock(); - lock.lock(); + Assert.assertTrue(lock.tryLock()); + Assert.assertTrue(lock.tryLock()); lock.unlock(); + // next row for test renew expiration tisk. + //Thread.currentThread().sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS*2)); + Thread thread1 = new Thread() { + @Override + public void run() { + RLock lock1 = (RedissonLock) redisson.getLock("lock1"); + Assert.assertFalse(lock1.tryLock()); + } + }; + thread1.start(); + thread1.join(); lock.unlock(); } @@ -187,7 +215,7 @@ public class RedissonLockTest extends BaseConcurrentTest { System.out.println("lock1 " + Thread.currentThread().getId()); lock.lock(); System.out.println("lock2 "+ Thread.currentThread().getId()); - lockedCounter.set(lockedCounter.get() + 1); + lockedCounter.incrementAndGet(); System.out.println("lockedCounter " + lockedCounter); System.out.println("unlock1 "+ Thread.currentThread().getId()); lock.unlock(); @@ -213,7 +241,7 @@ public class RedissonLockTest extends BaseConcurrentTest { } catch (InterruptedException e) { e.printStackTrace(); } - lockedCounter.set(lockedCounter.get() + 1); + lockedCounter.incrementAndGet(); redisson.getLock("testConcurrency_MultiInstance1").unlock(); } } @@ -232,7 +260,7 @@ public class RedissonLockTest extends BaseConcurrentTest { public void run(Redisson redisson) { Lock lock = redisson.getLock("testConcurrency_MultiInstance2"); lock.lock(); - lockedCounter.set(lockedCounter.get() + 1); + lockedCounter.incrementAndGet(); lock.unlock(); } }); diff --git a/src/test/java/org/redisson/RedissonMapTest.java b/src/test/java/org/redisson/RedissonMapTest.java index 3a81350df..d6c90488f 100644 --- a/src/test/java/org/redisson/RedissonMapTest.java +++ b/src/test/java/org/redisson/RedissonMapTest.java @@ -388,6 +388,22 @@ public class RedissonMapTest extends BaseTest { Assert.assertEquals("6", val2.getValue()); } + @Test + public void testPutIfAbsent() throws Exception { + ConcurrentMap map = redisson.getMap("simple"); + SimpleKey key = new SimpleKey("1"); + SimpleValue value = new SimpleValue("2"); + map.put(key, value); + Assert.assertEquals(value, map.putIfAbsent(key, new SimpleValue("3"))); + Assert.assertEquals(value, map.get(key)); + + SimpleKey key1 = new SimpleKey("2"); + SimpleValue value1 = new SimpleValue("4"); + Assert.assertNull(map.putIfAbsent(key1, value1)); + Assert.assertEquals(value1, map.get(key1)); + + } + @Test public void testSize() { Map map = redisson.getMap("simple"); diff --git a/src/test/java/org/redisson/RedissonSetTest.java b/src/test/java/org/redisson/RedissonSetTest.java index 4bf0f0534..bd2154aa8 100644 --- a/src/test/java/org/redisson/RedissonSetTest.java +++ b/src/test/java/org/redisson/RedissonSetTest.java @@ -2,13 +2,8 @@ package org.redisson; import io.netty.util.concurrent.Future; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; +import java.io.Serializable; +import java.util.*; import java.util.concurrent.ExecutionException; import org.hamcrest.MatcherAssert; @@ -20,7 +15,7 @@ import org.redisson.core.RSortedSet; public class RedissonSetTest extends BaseTest { - public static class SimpleBean { + public static class SimpleBean implements Serializable { private Long lng; @@ -158,14 +153,37 @@ public class RedissonSetTest extends BaseTest { @Test public void testRetainAll() { Set set = redisson.getSet("set"); - for (int i = 0; i < 200; i++) { + for (int i = 0; i < 20000; i++) { set.add(i); } Assert.assertTrue(set.retainAll(Arrays.asList(1, 2))); + Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2)); Assert.assertEquals(2, set.size()); } +// @Test +// public void testIteratorRemoveHighVolume() { +// Set set = redisson.getSet("set") /*new HashSet()*/; +// for (int i = 0; i < 120000; i++) { +// set.add(i); +// } +// int cnt = 0; +// Iterator iterator = set.iterator(); +// while (iterator.hasNext()) { +// Integer integer = iterator.next(); +// if (integer > -1) { // always +// iterator.remove(); +// } +// cnt++; +// } +// System.out.println("-----------"); +// for (Integer integer : set) { +// System.out.println(integer); +// } +// Assert.assertEquals(20000, cnt); +// } + @Test public void testContainsAll() { Set set = redisson.getSet("set"); @@ -234,4 +252,27 @@ public class RedissonSetTest extends BaseTest { Assert.assertEquals(5, set.size()); } + + @Test + public void testRetainAllEmpty() { + Set set = redisson.getSet("set"); + set.add(1); + set.add(2); + set.add(3); + set.add(4); + set.add(5); + + Assert.assertTrue(set.retainAll(Collections.emptyList())); + Assert.assertEquals(0, set.size()); + } + + @Test + public void testRetainAllNoModify() { + Set set = redisson.getSet("set"); + set.add(1); + set.add(2); + + Assert.assertFalse(set.retainAll(Arrays.asList(1, 2))); // nothing changed + Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2)); + } } diff --git a/src/test/java/org/redisson/RedissonTopicTest.java b/src/test/java/org/redisson/RedissonTopicTest.java index af812aaae..f9178fd2b 100644 --- a/src/test/java/org/redisson/RedissonTopicTest.java +++ b/src/test/java/org/redisson/RedissonTopicTest.java @@ -1,5 +1,6 @@ package org.redisson; +import java.io.Serializable; import java.util.concurrent.CountDownLatch; import org.junit.Assert; @@ -9,7 +10,7 @@ import org.redisson.core.RTopic; public class RedissonTopicTest { - public static class Message { + public static class Message implements Serializable { private String name; @@ -47,7 +48,7 @@ public class RedissonTopicTest { public void testUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - Redisson redisson = Redisson.create(); + Redisson redisson = BaseTest.createInstance(); RTopic topic1 = redisson.getTopic("topic1"); int listenerId = topic1.addListener(new MessageListener() { @Override @@ -77,7 +78,7 @@ public class RedissonTopicTest { public void testLazyUnsubscribe() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1); - Redisson redisson1 = Redisson.create(); + Redisson redisson1 = BaseTest.createInstance(); RTopic topic1 = redisson1.getTopic("topic"); int listenerId = topic1.addListener(new MessageListener() { @Override @@ -89,7 +90,7 @@ public class RedissonTopicTest { topic1.removeListener(listenerId); Thread.sleep(1000); - Redisson redisson2 = Redisson.create(); + Redisson redisson2 = BaseTest.createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic2.addListener(new MessageListener() { @Override @@ -111,7 +112,7 @@ public class RedissonTopicTest { public void test() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(2); - Redisson redisson1 = Redisson.create(); + Redisson redisson1 = BaseTest.createInstance(); RTopic topic1 = redisson1.getTopic("topic"); topic1.addListener(new MessageListener() { @Override @@ -121,7 +122,7 @@ public class RedissonTopicTest { } }); - Redisson redisson2 = Redisson.create(); + Redisson redisson2 = BaseTest.createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic2.addListener(new MessageListener() { @Override @@ -140,7 +141,7 @@ public class RedissonTopicTest { @Test public void testListenerRemove() throws InterruptedException { - Redisson redisson1 = Redisson.create(); + Redisson redisson1 = BaseTest.createInstance(); RTopic topic1 = redisson1.getTopic("topic"); int id = topic1.addListener(new MessageListener() { @Override @@ -149,7 +150,7 @@ public class RedissonTopicTest { } }); - Redisson redisson2 = Redisson.create(); + Redisson redisson2 = BaseTest.createInstance(); RTopic topic2 = redisson2.getTopic("topic"); topic1.removeListener(id); topic2.publish(new Message("123")); diff --git a/src/test/java/org/redisson/TestObject.java b/src/test/java/org/redisson/TestObject.java index a062c5d33..083081a40 100644 --- a/src/test/java/org/redisson/TestObject.java +++ b/src/test/java/org/redisson/TestObject.java @@ -1,6 +1,8 @@ package org.redisson; -public class TestObject implements Comparable { +import java.io.Serializable; + +public class TestObject implements Comparable, Serializable { private String name; private String value; From 758c37bce5aece2055aed91d0400bcd80689057e Mon Sep 17 00:00:00 2001 From: Andrew Kolpakov Date: Fri, 26 Jun 2015 18:36:18 +0600 Subject: [PATCH 2/2] encode binary data in operation debug output --- .../java/com/lambdaworks/redis/protocol/CommandHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index c932e347f..f7598e0fa 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -48,7 +48,7 @@ public class CommandHandler extends ChannelDuplexHandler { try { if (!input.isReadable()) return; -// System.out.println("in: " + input.toString(CharsetUtil.UTF_8)); +// System.out.println("in: " + toHexString(input)); buffer.discardReadBytes(); buffer.writeBytes(input); @@ -64,7 +64,6 @@ public class CommandHandler extends ChannelDuplexHandler { Command cmd = (Command) msg; ByteBuf buf = ctx.alloc().heapBuffer(); cmd.encode(buf); -// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); // System.out.println("out: " + toHexString(buf)); ctx.write(buf, promise);