eval read/write sharding. pubsub multichannel support

pull/243/head
Nikita 10 years ago
parent 18febb690e
commit 2b168cb0fa

@ -41,7 +41,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public boolean compareAndSet(long expect, long update) {
return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return connectionManager.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return true "
@ -62,7 +62,8 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public long getAndAdd(long delta) {
return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
return connectionManager.evalWrite(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]) or 0; "
+ "redis.call('set', KEYS[1], v + ARGV[1]); "
+ "return tonumber(v)",
@ -70,8 +71,9 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
}
@Override
public long getAndSet(final long newValue) {
return connectionManager.eval(StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
public long getAndSet(long newValue) {
return connectionManager.evalWrite(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)",
Collections.<Object>singletonList(getName()), newValue);
}

@ -93,7 +93,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
throw new NullPointerException();
}
List<V> list = connectionManager.eval(RedisCommands.EVAL_LIST,
List<V> list = connectionManager.evalWrite(getName(), RedisCommands.EVAL_LIST,
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
@ -110,7 +110,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
throw new NullPointerException();
}
List<V> list = connectionManager.eval(RedisCommands.EVAL_LIST,
List<V> list = connectionManager.evalWrite(getName(), RedisCommands.EVAL_LIST,
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +

@ -203,7 +203,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return;
}
connectionManager.eval(RedisCommands.EVAL_BOOLEAN,
connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
@ -234,14 +234,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean trySetCount(long count) {
return connectionManager.eval(RedisCommands.EVAL_BOOLEAN,
return connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, count, getChannelName());
}
@Override
public Future<Boolean> deleteAsync() {
return connectionManager.evalAsync(RedisCommands.EVAL_BOOLEAN,
return connectionManager.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
}

@ -72,7 +72,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> countAsync() {
return connectionManager.readAsync(getName(), RedisCommands.PFCOUNT, getName());
return connectionManager.writeAsync(getName(), RedisCommands.PFCOUNT, getName());
}
@Override
@ -80,7 +80,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
return connectionManager.readAsync(getName(), RedisCommands.PFCOUNT, args.toArray());
return connectionManager.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray());
}
@Override

@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
import org.redisson.client.protocol.BooleanReplayConvertor;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import static org.redisson.client.protocol.RedisCommands.*;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RList;
@ -50,7 +50,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public int size() {
Long size = connectionManager.read(getName(), RedisCommands.LLEN, getName());
Long size = connectionManager.read(getName(), LLEN, getName());
return size.intValue();
}
@ -76,7 +76,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
protected List<V> readAllList() {
return connectionManager.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
return connectionManager.read(getName(), LRANGE, getName(), 0, -1);
}
@Override
@ -101,7 +101,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
protected boolean remove(Object o, int count) {
return (Long)connectionManager.write(getName(), RedisCommands.LREM, getName(), count, o) > 0;
return (Long)connectionManager.write(getName(), LREM, getName(), count, o) > 0;
}
@Override
@ -114,7 +114,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
final int j = i;
List<V> range = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1);
List<V> range = connectionManager.read(getName(), LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1);
for (Iterator<Object> iterator = copy.iterator(); iterator.hasNext();) {
Object obj = iterator.next();
int index = range.indexOf(obj);
@ -143,7 +143,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Future<Long> res = connectionManager.writeAsync(getName(), RedisCommands.RPUSH, args.toArray());
Future<Long> res = connectionManager.writeAsync(getName(), RPUSH, args.toArray());
res.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
@ -171,7 +171,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
Collections.reverse(elements);
elements.add(0, getName());
Long newSize = connectionManager.write(getName(), RedisCommands.LPUSH, elements.toArray());
Long newSize = connectionManager.write(getName(), LPUSH, elements.toArray());
return newSize != size;
}
@ -180,7 +180,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
return connectionManager.eval(new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
return connectionManager.evalWrite(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
@ -212,7 +212,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
boolean result = false;
for (Object object : c) {
boolean res = (Long)connectionManager.write(getName(), RedisCommands.LREM, getName(), 0, object) > 0;
boolean res = (Long)connectionManager.write(getName(), LREM, getName(), 0, object) > 0;
if (!result) {
result = res;
}
@ -240,7 +240,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<V> getAsync(int index) {
return connectionManager.readAsync(getName(), RedisCommands.LINDEX, getName(), index);
return connectionManager.readAsync(getName(), LINDEX, getName(), index);
}
@Override
@ -278,7 +278,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public V set(int index, V element) {
checkIndex(index);
return connectionManager.eval(new RedisCommand<Object>("EVAL", 5),
return connectionManager.evalWrite(getName(), new RedisCommand<Object>("EVAL", 5),
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v",
@ -306,10 +306,10 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
checkIndex(index);
if (index == 0) {
return connectionManager.write(getName(), RedisCommands.LPOP, getName());
return connectionManager.write(getName(), LPOP, getName());
}
return connectionManager.eval(RedisCommands.EVAL_OBJECT,
return connectionManager.evalWrite(getName(), EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1]);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
@ -324,7 +324,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
}
Long index = connectionManager.eval(new RedisCommand<Long>("EVAL", 4),
Long index = connectionManager.evalRead(getName(), new RedisCommand<Long>("EVAL", 4),
"local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1",
@ -338,7 +338,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
}
return ((Long)connectionManager.eval(new RedisCommand<Long>("EVAL", 4),
return ((Long)connectionManager.evalRead(getName(), new RedisCommand<Long>("EVAL", 4),
"local s = redis.call('llen', KEYS[1]);" +
"for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1",
@ -455,7 +455,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
}
return connectionManager.read(getName(), RedisCommands.LRANGE, getName(), fromIndex, toIndex - 1);
return connectionManager.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1);
}
public String toString() {

@ -267,7 +267,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return connectionManager.eval(RedisCommands.EVAL_INTEGER,
return connectionManager.evalWrite(getName(), RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " +
@ -342,7 +342,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void unlock() {
Boolean opStatus = connectionManager.eval(RedisCommands.EVAL_BOOLEAN,
Boolean opStatus = connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " +
@ -385,7 +385,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
return connectionManager.evalAsync(RedisCommands.EVAL_BOOLEAN,
return connectionManager.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
"redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true",
Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
}
@ -397,7 +397,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean isHeldByCurrentThread() {
Boolean opStatus = connectionManager.eval(RedisCommands.EVAL_BOOLEAN,
Boolean opStatus = connectionManager.evalRead(getName(), RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return false; " +
@ -415,7 +415,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public int getHoldCount() {
Long opStatus = connectionManager.eval(RedisCommands.EVAL_INTEGER,
Long opStatus = connectionManager.evalRead(getName(), RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return 0; " +
@ -423,7 +423,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
" local o = cjson.decode(v); " +
" return o['c']; " +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId());
Collections.<Object>singletonList(getName()));
return opStatus.intValue();
}

@ -155,7 +155,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> putIfAbsentAsync(K key, V value) {
return connectionManager.evalAsync(EVAL_PUT,
return connectionManager.evalWriteAsync(getName(), EVAL_PUT,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end",
Collections.<Object>singletonList(getName()), key, value);
}
@ -167,7 +167,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Long> removeAsync(Object key, Object value) {
return connectionManager.evalAsync(new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP),
return connectionManager.evalWriteAsync(getName(),
new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP),
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return 0 end",
Collections.<Object>singletonList(getName()), key, value);
}
@ -179,7 +180,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
return connectionManager.evalAsync(new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4,
return connectionManager.evalWriteAsync(getName(),
new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4,
Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)),
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return true; else return false; end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
@ -192,7 +194,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> replaceAsync(K key, V value) {
return connectionManager.evalAsync(new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE),
return connectionManager.evalWriteAsync(getName(),
new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE),
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end",
Collections.<Object>singletonList(getName()), key, value);
}
@ -204,7 +207,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> putAsync(K key, V value) {
return connectionManager.evalAsync(EVAL_PUT,
return connectionManager.evalWriteAsync(getName(), EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v",
Collections.<Object>singletonList(getName()), key, value);
}
@ -212,7 +215,8 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> removeAsync(K key) {
return connectionManager.evalAsync(new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE),
return connectionManager.evalWriteAsync(getName(),
new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v",
Collections.<Object>singletonList(getName()), key);
}

@ -603,7 +603,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
String className = comparator.getClass().getName();
final String comparatorSign = className + ":" + calcClassSign(className);
Boolean res = connectionManager.eval(RedisCommands.EVAL_BOOLEAN,
Boolean res = connectionManager.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"if redis.call('llen', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[1]); return true; "
+ "else return false; end",
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);

@ -15,6 +15,7 @@
*/
package org.redisson.client;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.CommandData;
@ -60,19 +61,19 @@ public class RedisPubSubConnection extends RedisConnection {
}
}
public Future<PubSubStatusMessage> subscribe(Codec codec, String ... channel) {
public Future<List<PubSubStatusMessage>> subscribe(Codec codec, String ... channel) {
return async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> psubscribe(Codec codec, String ... channel) {
public Future<List<PubSubStatusMessage>> psubscribe(Codec codec, String ... channel) {
return async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> unsubscribe(String ... channel) {
public Future<List<PubSubStatusMessage>> unsubscribe(String ... channel) {
return async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> punsubscribe(String ... channel) {
public Future<List<PubSubStatusMessage>> punsubscribe(String ... channel) {
return async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
}

@ -31,6 +31,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,7 +127,20 @@ public class CommandDecoder extends ReplayingDecoder<Void> {
}
Object result = messageDecoder(data, respParts).decode(respParts);
handleMultiResult(data, parts, channel, result);
if (result instanceof PubSubStatusMessage) {
if (parts == null) {
parts = new ArrayList<Object>();
}
parts.add(result);
// has next status messages
if (in.writerIndex() > in.readerIndex()) {
decode(in, data, parts, channel, currentDecoder);
} else {
handleMultiResult(data, null, channel, parts);
}
} else {
handleMultiResult(data, parts, channel, result);
}
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}

@ -201,4 +201,9 @@ public class RedisCommand<R> {
return outParamType;
}
@Override
public String toString() {
return "RedisCommand [name=" + name + ", subName=" + subName + "]";
}
}

@ -126,10 +126,10 @@ public interface RedisCommands {
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 2);
RedisStrictCommand<PubSubStatusMessage> SUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("SUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> UNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("UNSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> PUNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PUNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PSUBSCRIBE = new RedisCommand<Object>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<String> CLUSTER_NODES = new RedisStrictCommand<String>("CLUSTER", "NODES", new StringDataDecoder());

@ -15,7 +15,6 @@
*/
package org.redisson.client.protocol.pubsub;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.protocol.decoder.MultiDecoder;
@ -23,7 +22,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements MultiDecoder<PubSubStatusMessage> {
public class PubSubStatusDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
@ -34,11 +33,7 @@ public class PubSubStatusDecoder implements MultiDecoder<PubSubStatusMessage> {
@Override
public PubSubStatusMessage decode(List<Object> parts) {
List<String> channels = new ArrayList<String>();
for (Object part : parts.subList(1, parts.size()-1)) {
channels.add(part.toString());
}
return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels);
return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override

@ -15,23 +15,21 @@
*/
package org.redisson.client.protocol.pubsub;
import java.util.List;
public class PubSubStatusMessage {
public enum Type {SUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, UNSUBSCRIBE}
private final Type type;
private final List<String> channels;
private final String channel;
public PubSubStatusMessage(Type type, List<String> channels) {
public PubSubStatusMessage(Type type, String channel) {
super();
this.type = type;
this.channels = channels;
this.channel = channel;
}
public List<String> getChannels() {
return channels;
public String getChannel() {
return channel;
}
public Type getType() {
@ -40,7 +38,7 @@ public class PubSubStatusMessage {
@Override
public String toString() {
return "PubSubStatusMessage [type=" + type + ", channels=" + channels + "]";
return "PubSubStatusMessage [type=" + type + ", channels=" + channel + "]";
}
}

@ -40,6 +40,23 @@ import io.netty.util.concurrent.Future;
//TODO ping support
public interface ConnectionManager {
<T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<R> R read(String key, SyncOperation<R> operation);
<R> R write(String key, SyncOperation<R> operation);
@ -52,8 +69,6 @@ public interface ConnectionManager {
<T, R> Future<R> writeAsync(Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R eval(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalAsync(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalAsync(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
@ -64,8 +79,6 @@ public interface ConnectionManager {
<T, R> R read(String key, RedisCommand<T> command, Object ... params);
<T, R> R eval(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params);

@ -317,35 +317,75 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, command, params);
public <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalReadAsync(key, codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = getGroup().next().newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
int slot = calcSlot(key);
async(true, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalRead(key, codec, evalCommandType, script, keys, params);
}
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
public <T, R> Future<R> evalAsync(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(codec, evalCommandType, script, keys, params);
public <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalWriteAsync(key, codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalAsync(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = getGroup().next().newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(false, -1, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
int slot = calcSlot(key);
async(false, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
public <T, R> R eval(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return eval(codec, evalCommandType, script, keys, params);
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalWrite(key, codec, evalCommandType, script, keys, params);
}
public <T, R> R eval(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalAsync(codec, evalCommandType, script, keys, params);
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, command, params);
return get(res);
}
public <T, R> Future<R> evalAsync(RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalAsync(Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = getGroup().next().newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(false, -1, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
return writeAsync(key, codec, command, params);
@ -416,7 +456,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} else {
connection = connectionWriteOp(slot);
}
log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());
connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException());
@ -559,7 +599,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<PubSubStatusMessage> subscribe(RedisPubSubListener listener, String channelName) {
public Future<List<PubSubStatusMessage>> subscribe(RedisPubSubListener listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry.subscribe(codec, listener, channelName);
@ -571,7 +611,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName)));
return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName)));
}
synchronized (entry) {
if (!entry.isActive()) {
@ -591,7 +631,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(slot, entry);
return group.next().newSucceededFuture(new PubSubStatusMessage(Type.SUBSCRIBE, Arrays.asList(channelName)));
return group.next().newSucceededFuture(Arrays.asList(new PubSubStatusMessage(Type.SUBSCRIBE, channelName)));
}
synchronized (entry) {
if (!entry.isActive()) {
@ -603,16 +643,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<PubSubStatusMessage> unsubscribe(String channelName) {
public Future<List<PubSubStatusMessage>> unsubscribe(String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return group.next().newSucceededFuture(null);
}
Future<PubSubStatusMessage> future = entry.unsubscribe(channelName);
future.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> future = entry.unsubscribe(channelName);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
@ -624,16 +664,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<PubSubStatusMessage> punsubscribe(String channelName) {
public Future<List<PubSubStatusMessage>> punsubscribe(String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return group.next().newSucceededFuture(null);
}
Future<PubSubStatusMessage> future = entry.punsubscribe(channelName);
future.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> future = entry.punsubscribe(channelName);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);

@ -21,6 +21,7 @@ import io.netty.util.concurrent.FutureListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -124,31 +125,33 @@ public class PubSubConnectionEntry {
}
public void subscribe(Codec codec, final String channelName) {
Future<PubSubStatusMessage> result = conn.subscribe(codec, channelName);
result.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> result = conn.subscribe(codec, channelName);
result.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
log.debug("subscribed to '{}' channel on server '{}'", channelName, conn.getRedisClient().getAddr());
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
if (future.isSuccess()) {
log.debug("subscribed to '{}' channel on server '{}'", channelName, conn.getRedisClient().getAddr());
}
}
});
}
public void psubscribe(Codec codec, final String pattern) {
Future<PubSubStatusMessage> result = conn.psubscribe(codec, pattern);
result.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> result = conn.psubscribe(codec, pattern);
result.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
log.debug("punsubscribed from '{}' pattern on server '{}'", pattern, conn.getRedisClient().getAddr());
}
});
}
public Future<PubSubStatusMessage> subscribe(Codec codec, RedisPubSubListener listener, String channel) {
public Future<List<PubSubStatusMessage>> subscribe(Codec codec, RedisPubSubListener listener, String channel) {
addListener(channel, listener);
return conn.subscribe(codec, channel);
}
public Future<PubSubStatusMessage> unsubscribe(final String channel) {
public Future<List<PubSubStatusMessage>> unsubscribe(final String channel) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
@ -156,17 +159,17 @@ public class PubSubConnectionEntry {
}
}
Future<PubSubStatusMessage> future = conn.unsubscribe(channel);
future.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> future = conn.unsubscribe(channel);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
subscribedChannelsAmount.release();
}
});
return future;
}
public Future<PubSubStatusMessage> punsubscribe(final String channel) {
public Future<List<PubSubStatusMessage>> punsubscribe(final String channel) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
@ -174,10 +177,10 @@ public class PubSubConnectionEntry {
}
}
Future<PubSubStatusMessage> future = conn.punsubscribe(channel);
future.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> future = conn.punsubscribe(channel);
future.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
subscribedChannelsAmount.release();
}
});

@ -132,11 +132,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
});
Future<PubSubStatusMessage> res = pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave");
res.addListener(new FutureListener<PubSubStatusMessage>() {
Future<List<PubSubStatusMessage>> res = pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave");
res.addListener(new FutureListener<List<PubSubStatusMessage>>() {
@Override
public void operationComplete(Future<PubSubStatusMessage> future) throws Exception {
log.info("subscribed to channel: {} from Sentinel {}:{}", future.getNow().getChannels(), addr.getHost(), addr.getPort());
public void operationComplete(Future<List<PubSubStatusMessage>> future) throws Exception {
log.info("subscribed to channels: {} from Sentinel {}:{}", future.getNow(), addr.getHost(), addr.getPort());
}
});
}

Loading…
Cancel
Save