diff --git a/src/main/java/org/redisson/RedissonAtomicLong.java b/src/main/java/org/redisson/RedissonAtomicLong.java index ecd5b6c44..fc4331ba3 100644 --- a/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/src/main/java/org/redisson/RedissonAtomicLong.java @@ -41,7 +41,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public boolean compareAndSet(long expect, long update) { - return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return connectionManager.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('get', KEYS[1]) == ARGV[1] then " + "redis.call('set', KEYS[1], ARGV[2]); " + "return true " @@ -62,7 +62,8 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public long getAndAdd(long delta) { - return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + return connectionManager.evalWrite(getName(), + StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local v = redis.call('get', KEYS[1]) or 0; " + "redis.call('set', KEYS[1], v + ARGV[1]); " + "return tonumber(v)", @@ -70,8 +71,9 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong } @Override - public long getAndSet(final long newValue) { - return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + public long getAndSet(long newValue) { + return connectionManager.evalWrite(getName(), + StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)", Collections.singletonList(getName()), newValue); } diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index f3ce1bbab..be6de3375 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -93,7 +93,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock throw new NullPointerException(); } - List list = connectionManager.eval(RedisCommands.EVAL_LIST, + List list = connectionManager.evalWrite(getName(), RedisCommands.EVAL_LIST, "local vals = redis.call('lrange', KEYS[1], 0, -1); " + "redis.call('ltrim', KEYS[1], -1, 0); " + "return vals", Collections.singletonList(getName())); @@ -110,7 +110,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock throw new NullPointerException(); } - List list = connectionManager.eval(RedisCommands.EVAL_LIST, + List list = connectionManager.evalWrite(getName(), RedisCommands.EVAL_LIST, "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index e6856f11f..22cb83737 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -203,7 +203,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return; } - connectionManager.eval(RedisCommands.EVAL_BOOLEAN, + connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" + @@ -234,14 +234,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public boolean trySetCount(long count) { - return connectionManager.eval(RedisCommands.EVAL_BOOLEAN, + return connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end", Collections.singletonList(getName()), newCountMessage, count, getChannelName()); } @Override public Future deleteAsync() { - return connectionManager.evalAsync(RedisCommands.EVAL_BOOLEAN, + return connectionManager.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN, "if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end", Collections.singletonList(getName()), newCountMessage, getChannelName()); } diff --git a/src/main/java/org/redisson/RedissonHyperLogLog.java b/src/main/java/org/redisson/RedissonHyperLogLog.java index 89cf86de5..dbfcc8139 100644 --- a/src/main/java/org/redisson/RedissonHyperLogLog.java +++ b/src/main/java/org/redisson/RedissonHyperLogLog.java @@ -72,7 +72,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future countAsync() { - return connectionManager.readAsync(getName(), RedisCommands.PFCOUNT, getName()); + return connectionManager.writeAsync(getName(), RedisCommands.PFCOUNT, getName()); } @Override @@ -80,7 +80,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL List args = new ArrayList(otherLogNames.length + 1); args.add(getName()); args.addAll(Arrays.asList(otherLogNames)); - return connectionManager.readAsync(getName(), RedisCommands.PFCOUNT, args.toArray()); + return connectionManager.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray()); } @Override diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index c1be924a3..0152e0a08 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -25,7 +25,7 @@ import java.util.NoSuchElementException; import org.redisson.client.protocol.BooleanReplayConvertor; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; +import static org.redisson.client.protocol.RedisCommands.*; import org.redisson.connection.ConnectionManager; import org.redisson.core.RList; @@ -50,7 +50,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public int size() { - Long size = connectionManager.read(getName(), RedisCommands.LLEN, getName()); + Long size = connectionManager.read(getName(), LLEN, getName()); return size.intValue(); } @@ -76,7 +76,7 @@ public class RedissonList extends RedissonExpirable implements RList { } protected List readAllList() { - return connectionManager.read(getName(), RedisCommands.LRANGE, getName(), 0, -1); + return connectionManager.read(getName(), LRANGE, getName(), 0, -1); } @Override @@ -101,7 +101,7 @@ public class RedissonList extends RedissonExpirable implements RList { } protected boolean remove(Object o, int count) { - return (Long)connectionManager.write(getName(), RedisCommands.LREM, getName(), count, o) > 0; + return (Long)connectionManager.write(getName(), LREM, getName(), count, o) > 0; } @Override @@ -114,7 +114,7 @@ public class RedissonList extends RedissonExpirable implements RList { int to = div(size(), batchSize); for (int i = 0; i < to; i++) { final int j = i; - List range = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1); + List range = connectionManager.read(getName(), LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1); for (Iterator iterator = copy.iterator(); iterator.hasNext();) { Object obj = iterator.next(); int index = range.indexOf(obj); @@ -143,7 +143,7 @@ public class RedissonList extends RedissonExpirable implements RList { List args = new ArrayList(c.size() + 1); args.add(getName()); args.addAll(c); - Future res = connectionManager.writeAsync(getName(), RedisCommands.RPUSH, args.toArray()); + Future res = connectionManager.writeAsync(getName(), RPUSH, args.toArray()); res.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -171,7 +171,7 @@ public class RedissonList extends RedissonExpirable implements RList { Collections.reverse(elements); elements.add(0, getName()); - Long newSize = connectionManager.write(getName(), RedisCommands.LPUSH, elements.toArray()); + Long newSize = connectionManager.write(getName(), LPUSH, elements.toArray()); return newSize != size; } @@ -180,7 +180,7 @@ public class RedissonList extends RedissonExpirable implements RList { List args = new ArrayList(coll.size() + 1); args.add(index); args.addAll(coll); - return connectionManager.eval(new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), + return connectionManager.evalWrite(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), "local ind = table.remove(ARGV, 1); " + // index is the first parameter "local tail = redis.call('lrange', KEYS[1], ind, -1); " + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + @@ -212,7 +212,7 @@ public class RedissonList extends RedissonExpirable implements RList { boolean result = false; for (Object object : c) { - boolean res = (Long)connectionManager.write(getName(), RedisCommands.LREM, getName(), 0, object) > 0; + boolean res = (Long)connectionManager.write(getName(), LREM, getName(), 0, object) > 0; if (!result) { result = res; } @@ -240,7 +240,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future getAsync(int index) { - return connectionManager.readAsync(getName(), RedisCommands.LINDEX, getName(), index); + return connectionManager.readAsync(getName(), LINDEX, getName(), index); } @Override @@ -278,7 +278,7 @@ public class RedissonList extends RedissonExpirable implements RList { public V set(int index, V element) { checkIndex(index); - return connectionManager.eval(new RedisCommand("EVAL", 5), + return connectionManager.evalWrite(getName(), new RedisCommand("EVAL", 5), "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + "return v", @@ -306,10 +306,10 @@ public class RedissonList extends RedissonExpirable implements RList { checkIndex(index); if (index == 0) { - return connectionManager.write(getName(), RedisCommands.LPOP, getName()); + return connectionManager.write(getName(), LPOP, getName()); } - return connectionManager.eval(RedisCommands.EVAL_OBJECT, + return connectionManager.evalWrite(getName(), EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "local tail = redis.call('lrange', KEYS[1], ARGV[1]);" + "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + @@ -324,7 +324,7 @@ public class RedissonList extends RedissonExpirable implements RList { return -1; } - Long index = connectionManager.eval(new RedisCommand("EVAL", 4), + Long index = connectionManager.evalRead(getName(), new RedisCommand("EVAL", 4), "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "return -1", @@ -338,7 +338,7 @@ public class RedissonList extends RedissonExpirable implements RList { return -1; } - return ((Long)connectionManager.eval(new RedisCommand("EVAL", 4), + return ((Long)connectionManager.evalRead(getName(), new RedisCommand("EVAL", 4), "local s = redis.call('llen', KEYS[1]);" + "for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "return -1", @@ -455,7 +455,7 @@ public class RedissonList extends RedissonExpirable implements RList { throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex); } - return connectionManager.read(getName(), RedisCommands.LRANGE, getName(), fromIndex, toIndex - 1); + return connectionManager.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1); } public String toString() { diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index b56b0364c..d7b8b5024 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -267,7 +267,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { private Long tryLockInner(final long leaseTime, final TimeUnit unit) { internalLockLeaseTime = unit.toMillis(leaseTime); - return connectionManager.eval(RedisCommands.EVAL_INTEGER, + return connectionManager.evalWrite(getName(), RedisCommands.EVAL_INTEGER, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + " redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " + @@ -342,7 +342,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void unlock() { - Boolean opStatus = connectionManager.eval(RedisCommands.EVAL_BOOLEAN, + Boolean opStatus = connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + " redis.call('publish', ARGV[4], ARGV[2]); " + @@ -385,7 +385,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { private Future forceUnlockAsync() { stopRefreshTask(); - return connectionManager.evalAsync(RedisCommands.EVAL_BOOLEAN, + return connectionManager.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN, "redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true", Collections.singletonList(getName()), unlockMessage, getChannelName()); } @@ -397,7 +397,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isHeldByCurrentThread() { - Boolean opStatus = connectionManager.eval(RedisCommands.EVAL_BOOLEAN, + Boolean opStatus = connectionManager.evalRead(getName(), RedisCommands.EVAL_BOOLEAN, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + " return false; " + @@ -415,7 +415,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public int getHoldCount() { - Long opStatus = connectionManager.eval(RedisCommands.EVAL_INTEGER, + Long opStatus = connectionManager.evalRead(getName(), RedisCommands.EVAL_INTEGER, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + " return 0; " + @@ -423,7 +423,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { " local o = cjson.decode(v); " + " return o['c']; " + "end", - Collections.singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId()); + Collections.singletonList(getName())); return opStatus.intValue(); } diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index b62329b0a..f2e807021 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -155,7 +155,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future putIfAbsentAsync(K key, V value) { - return connectionManager.evalAsync(EVAL_PUT, + return connectionManager.evalWriteAsync(getName(), EVAL_PUT, "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end", Collections.singletonList(getName()), key, value); } @@ -167,7 +167,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future removeAsync(Object key, Object value) { - return connectionManager.evalAsync(new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP), + return connectionManager.evalWriteAsync(getName(), + new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP), "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return 0 end", Collections.singletonList(getName()), key, value); } @@ -179,7 +180,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future replaceAsync(K key, V oldValue, V newValue) { - return connectionManager.evalAsync(new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, + return connectionManager.evalWriteAsync(getName(), + new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)), "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return true; else return false; end", Collections.singletonList(getName()), key, oldValue, newValue); @@ -192,7 +194,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future replaceAsync(K key, V value) { - return connectionManager.evalAsync(new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE), + return connectionManager.evalWriteAsync(getName(), + new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE), "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end", Collections.singletonList(getName()), key, value); } @@ -204,7 +207,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future putAsync(K key, V value) { - return connectionManager.evalAsync(EVAL_PUT, + return connectionManager.evalWriteAsync(getName(), EVAL_PUT, "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v", Collections.singletonList(getName()), key, value); } @@ -212,7 +215,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future removeAsync(K key) { - return connectionManager.evalAsync(new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE), + return connectionManager.evalWriteAsync(getName(), + new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE), "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v", Collections.singletonList(getName()), key); } diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index b9ee4bc0f..72be69e9f 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -603,7 +603,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSetasList(getName(), getComparatorKeyName()), comparatorSign); diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 004c00dbc..8d117d7e5 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -15,6 +15,7 @@ */ package org.redisson.client; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.redisson.client.handler.CommandData; @@ -60,19 +61,19 @@ public class RedisPubSubConnection extends RedisConnection { } } - public Future subscribe(Codec codec, String ... channel) { + public Future> subscribe(Codec codec, String ... channel) { return async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel); } - public Future psubscribe(Codec codec, String ... channel) { + public Future> psubscribe(Codec codec, String ... channel) { return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel); } - public Future unsubscribe(String ... channel) { + public Future> unsubscribe(String ... channel) { return async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel); } - public Future punsubscribe(String ... channel) { + public Future> punsubscribe(String ... channel) { return async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel); } diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 3a1dbf2af..1e36fea86 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -31,6 +31,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage; +import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,7 +127,20 @@ public class CommandDecoder extends ReplayingDecoder { } Object result = messageDecoder(data, respParts).decode(respParts); - handleMultiResult(data, parts, channel, result); + if (result instanceof PubSubStatusMessage) { + if (parts == null) { + parts = new ArrayList(); + } + parts.add(result); + // has next status messages + if (in.writerIndex() > in.readerIndex()) { + decode(in, data, parts, channel, currentDecoder); + } else { + handleMultiResult(data, null, channel, parts); + } + } else { + handleMultiResult(data, parts, channel, result); + } } else { throw new IllegalStateException("Can't decode replay " + (char)code); } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 8d948212e..27d707fdf 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -201,4 +201,9 @@ public class RedisCommand { return outParamType; } + @Override + public String toString() { + return "RedisCommand [name=" + name + ", subName=" + subName + "]"; + } + } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index d14a2a62e..feae3588a 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -126,10 +126,10 @@ public interface RedisCommands { RedisCommand PUBLISH = new RedisCommand("PUBLISH", 2); - RedisStrictCommand SUBSCRIBE = new RedisStrictCommand("SUBSCRIBE", new PubSubStatusDecoder()); - RedisStrictCommand UNSUBSCRIBE = new RedisStrictCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); - RedisStrictCommand PSUBSCRIBE = new RedisStrictCommand("PSUBSCRIBE", new PubSubStatusDecoder()); - RedisStrictCommand PUNSUBSCRIBE = new RedisStrictCommand("PUNSUBSCRIBE", new PubSubStatusDecoder()); + RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); + RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); + RedisCommand PSUBSCRIBE = new RedisCommand("PSUBSCRIBE", new PubSubStatusDecoder()); + RedisCommand PUNSUBSCRIBE = new RedisCommand("PUNSUBSCRIBE", new PubSubStatusDecoder()); RedisStrictCommand CLUSTER_NODES = new RedisStrictCommand("CLUSTER", "NODES", new StringDataDecoder()); diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java index bdd78c90f..81adecf92 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java @@ -15,7 +15,6 @@ */ package org.redisson.client.protocol.pubsub; -import java.util.ArrayList; import java.util.List; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -23,7 +22,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; -public class PubSubStatusDecoder implements MultiDecoder { +public class PubSubStatusDecoder implements MultiDecoder { @Override public Object decode(ByteBuf buf) { @@ -34,11 +33,7 @@ public class PubSubStatusDecoder implements MultiDecoder { @Override public PubSubStatusMessage decode(List parts) { - List channels = new ArrayList(); - for (Object part : parts.subList(1, parts.size()-1)) { - channels.add(part.toString()); - } - return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels); + return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString()); } @Override diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusMessage.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusMessage.java index fb20f91e2..f271cbf56 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusMessage.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusMessage.java @@ -15,23 +15,21 @@ */ package org.redisson.client.protocol.pubsub; -import java.util.List; - public class PubSubStatusMessage { public enum Type {SUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, UNSUBSCRIBE} private final Type type; - private final List channels; + private final String channel; - public PubSubStatusMessage(Type type, List channels) { + public PubSubStatusMessage(Type type, String channel) { super(); this.type = type; - this.channels = channels; + this.channel = channel; } - public List getChannels() { - return channels; + public String getChannel() { + return channel; } public Type getType() { @@ -40,7 +38,7 @@ public class PubSubStatusMessage { @Override public String toString() { - return "PubSubStatusMessage [type=" + type + ", channels=" + channels + "]"; + return "PubSubStatusMessage [type=" + type + ", channels=" + channel + "]"; } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 2f034f181..e89536e9c 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -40,6 +40,23 @@ import io.netty.util.concurrent.Future; //TODO ping support public interface ConnectionManager { + Future evalReadAsync(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); + + Future evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + + R evalRead(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); + + R evalRead(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + + Future evalWriteAsync(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); + + Future evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + + R evalWrite(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); + + R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + + R read(String key, SyncOperation operation); R write(String key, SyncOperation operation); @@ -52,8 +69,6 @@ public interface ConnectionManager { Future writeAsync(Codec codec, RedisCommand command, Object ... params); - R eval(RedisCommand evalCommandType, String script, List keys, Object ... params); - Future evalAsync(RedisCommand evalCommandType, String script, List keys, Object ... params); Future evalAsync(Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); @@ -64,8 +79,6 @@ public interface ConnectionManager { R read(String key, RedisCommand command, Object ... params); - R eval(Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params); R write(String key, Codec codec, RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 6ac893733..728bec26b 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -317,35 +317,75 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - public R write(String key, RedisCommand command, Object ... params) { - Future res = writeAsync(key, command, params); + public Future evalReadAsync(String key, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalReadAsync(key, codec, evalCommandType, script, keys, params); + } + + public Future evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + Promise mainPromise = getGroup().next().newPromise(); + List args = new ArrayList(2 + keys.size() + params.length); + args.add(script); + args.add(keys.size()); + args.addAll(keys); + args.addAll(Arrays.asList(params)); + int slot = calcSlot(key); + async(true, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + return mainPromise; + } + + public R evalRead(String key, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalRead(key, codec, evalCommandType, script, keys, params); + } + + public R evalRead(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + Future res = evalReadAsync(key, codec, evalCommandType, script, keys, params); return get(res); } - public Future evalAsync(RedisCommand evalCommandType, String script, List keys, Object ... params) { - return evalAsync(codec, evalCommandType, script, keys, params); + public Future evalWriteAsync(String key, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalWriteAsync(key, codec, evalCommandType, script, keys, params); } - public Future evalAsync(Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public Future evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { Promise mainPromise = getGroup().next().newPromise(); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(false, -1, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + int slot = calcSlot(key); + async(false, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0); return mainPromise; } - public R eval(RedisCommand evalCommandType, String script, List keys, Object ... params) { - return eval(codec, evalCommandType, script, keys, params); + public R evalWrite(String key, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalWrite(key, codec, evalCommandType, script, keys, params); } - public R eval(Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { - Future res = evalAsync(codec, evalCommandType, script, keys, params); + public R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + Future res = evalWriteAsync(key, codec, evalCommandType, script, keys, params); + return get(res); + } + + public R write(String key, RedisCommand command, Object ... params) { + Future res = writeAsync(key, command, params); return get(res); } + public Future evalAsync(RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalAsync(codec, evalCommandType, script, keys, params); + } + + public Future evalAsync(Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + Promise mainPromise = getGroup().next().newPromise(); + List args = new ArrayList(2 + keys.size() + params.length); + args.add(script); + args.add(keys.size()); + args.addAll(keys); + args.addAll(Arrays.asList(params)); + async(false, -1, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + return mainPromise; + } public Future writeAsync(String key, RedisCommand command, Object ... params) { return writeAsync(key, codec, command, params); @@ -416,7 +456,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } else { connection = connectionWriteOp(slot); } - log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr()); + log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr()); connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); ex.set(new RedisTimeoutException()); @@ -559,7 +599,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future subscribe(RedisPubSubListener listener, String channelName) { + public Future> subscribe(RedisPubSubListener listener, String channelName) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { return сonnEntry.subscribe(codec, listener, channelName); @@ -571,7 +611,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { entry.release(); - return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName))); + return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName))); } synchronized (entry) { if (!entry.isActive()) { @@ -591,7 +631,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { returnSubscribeConnection(slot, entry); - return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName))); + return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName))); } synchronized (entry) { if (!entry.isActive()) { @@ -603,16 +643,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future unsubscribe(String channelName) { + public Future> unsubscribe(String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return group.next().newSucceededFuture(null); } - Future future = entry.unsubscribe(channelName); - future.addListener(new FutureListener() { + Future> future = entry.unsubscribe(channelName); + future.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future> future) throws Exception { synchronized (entry) { if (entry.tryClose()) { returnSubscribeConnection(-1, entry); @@ -624,16 +664,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future punsubscribe(String channelName) { + public Future> punsubscribe(String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return group.next().newSucceededFuture(null); } - Future future = entry.punsubscribe(channelName); - future.addListener(new FutureListener() { + Future> future = entry.punsubscribe(channelName); + future.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future> future) throws Exception { synchronized (entry) { if (entry.tryClose()) { returnSubscribeConnection(-1, entry); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index e1e812b46..34d6912f1 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -21,6 +21,7 @@ import io.netty.util.concurrent.FutureListener; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -124,31 +125,33 @@ public class PubSubConnectionEntry { } public void subscribe(Codec codec, final String channelName) { - Future result = conn.subscribe(codec, channelName); - result.addListener(new FutureListener() { + Future> result = conn.subscribe(codec, channelName); + result.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { - log.debug("subscribed to '{}' channel on server '{}'", channelName, conn.getRedisClient().getAddr()); + public void operationComplete(Future> future) throws Exception { + if (future.isSuccess()) { + log.debug("subscribed to '{}' channel on server '{}'", channelName, conn.getRedisClient().getAddr()); + } } }); } public void psubscribe(Codec codec, final String pattern) { - Future result = conn.psubscribe(codec, pattern); - result.addListener(new FutureListener() { + Future> result = conn.psubscribe(codec, pattern); + result.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future> future) throws Exception { log.debug("punsubscribed from '{}' pattern on server '{}'", pattern, conn.getRedisClient().getAddr()); } }); } - public Future subscribe(Codec codec, RedisPubSubListener listener, String channel) { + public Future> subscribe(Codec codec, RedisPubSubListener listener, String channel) { addListener(channel, listener); return conn.subscribe(codec, channel); } - public Future unsubscribe(final String channel) { + public Future> unsubscribe(final String channel) { Queue listeners = channelListeners.get(channel); if (listeners != null) { for (RedisPubSubListener listener : listeners) { @@ -156,17 +159,17 @@ public class PubSubConnectionEntry { } } - Future future = conn.unsubscribe(channel); - future.addListener(new FutureListener() { + Future> future = conn.unsubscribe(channel); + future.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future> future) throws Exception { subscribedChannelsAmount.release(); } }); return future; } - public Future punsubscribe(final String channel) { + public Future> punsubscribe(final String channel) { Queue listeners = channelListeners.get(channel); if (listeners != null) { for (RedisPubSubListener listener : listeners) { @@ -174,10 +177,10 @@ public class PubSubConnectionEntry { } } - Future future = conn.punsubscribe(channel); - future.addListener(new FutureListener() { + Future> future = conn.punsubscribe(channel); + future.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future> future) throws Exception { subscribedChannelsAmount.release(); } }); diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index bea4b6842..f309b54fd 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -132,11 +132,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }); - Future res = pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave"); - res.addListener(new FutureListener() { + Future> res = pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave"); + res.addListener(new FutureListener>() { @Override - public void operationComplete(Future future) throws Exception { - log.info("subscribed to channel: {} from Sentinel {}:{}", future.getNow().getChannels(), addr.getHost(), addr.getPort()); + public void operationComplete(Future> future) throws Exception { + log.info("subscribed to channels: {} from Sentinel {}:{}", future.getNow(), addr.getHost(), addr.getPort()); } }); }