Merge branch 'mrniko/master'

pull/282/head
Rui Gu 10 years ago
commit 11f53363a2

@ -69,7 +69,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setTimeout(config.getTimeout());
setClientName(config.getClientName());
setPingTimeout(config.getPingTimeout());
setCloseConnectionAfterFailAttempts(config.getCloseConnectionAfterFailAttempts());
setRefreshConnectionAfterFails(config.getRefreshConnectionAfterFails());
}
/**
@ -190,16 +190,16 @@ class BaseConfig<T extends BaseConfig<T>> {
}
/**
* Close connection if it has <code>failAttemptsAmount</code>
* Reconnect connection if it has <code>failAttemptsAmount</code>
* fails in a row during command sending. Turned off by default.
*
* @param failAttemptsAmount
*/
public T setCloseConnectionAfterFailAttempts(int failAttemptsAmount) {
public T setRefreshConnectionAfterFails(int failAttemptsAmount) {
this.closeConnectionAfterFailAttempts = failAttemptsAmount;
return (T) this;
}
public int getCloseConnectionAfterFailAttempts() {
public int getRefreshConnectionAfterFails() {
return closeConnectionAfterFailAttempts;
}

@ -278,12 +278,12 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
@Override
public <R> R read(String key, SyncOperation<R> operation) {
public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}
@Override
public <R> R write(String key, SyncOperation<R> operation) {
public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}

@ -60,9 +60,9 @@ public interface CommandExecutor {
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<R> R read(String key, SyncOperation<R> operation);
<R> R read(String key, Codec codec, SyncOperation<R> operation);
<R> R write(String key, SyncOperation<R> operation);
<R> R write(String key, Codec codec, SyncOperation<R> operation);
<T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params);

@ -236,17 +236,17 @@ public class CommandExecutorService implements CommandExecutor {
return readAsync(key, connectionManager.getCodec(), command, params);
}
public <R> R write(String key, SyncOperation<R> operation) {
public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(false, slot, operation, 0);
return async(false, codec, slot, operation, 0);
}
public <R> R read(String key, SyncOperation<R> operation) {
public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(true, slot, operation, 0);
return async(true, codec, slot, operation, 0);
}
private <R> R async(boolean readOnlyMode, int slot, SyncOperation<R> operation, int attempt) {
private <R> R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
return null;
}
@ -259,15 +259,15 @@ public class CommandExecutorService implements CommandExecutor {
connection = connectionManager.connectionWriteOp(slot);
}
try {
return operation.execute(connectionManager.getCodec(), connection);
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
return async(readOnlyMode, e.getSlot(), operation, attempt);
return async(readOnlyMode, codec, e.getSlot(), operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return async(readOnlyMode, slot, operation, attempt);
return async(readOnlyMode, codec, slot, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
@ -286,7 +286,7 @@ public class CommandExecutorService implements CommandExecutor {
Thread.currentThread().interrupt();
}
attempt++;
return async(readOnlyMode, slot, operation, attempt);
return async(readOnlyMode, codec, slot, operation, attempt);
}
}

@ -22,6 +22,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClusterConnectionManager;
import org.redisson.connection.ConnectionManager;
@ -123,6 +124,12 @@ public class Redisson implements RedissonClient {
return new RedissonBucket<V>(commandExecutor, name);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
return new RedissonBucket<V>(codec, commandExecutor, name);
}
/**
* Returns a list of object holder by a key pattern
*
@ -157,6 +164,12 @@ public class Redisson implements RedissonClient {
return new RedissonHyperLogLog<V>(commandExecutor, name);
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLog<V>(codec, commandExecutor, name);
}
/**
* Returns distributed list instance by name.
*
@ -168,6 +181,11 @@ public class Redisson implements RedissonClient {
return new RedissonList<V>(commandExecutor, name);
}
@Override
public <V> RList<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, commandExecutor, name);
}
/**
* Returns distributed map instance by name.
*
@ -179,6 +197,11 @@ public class Redisson implements RedissonClient {
return new RedissonMap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, commandExecutor, name);
}
/**
* Returns distributed lock instance by name.
*
@ -201,11 +224,18 @@ public class Redisson implements RedissonClient {
return new RedissonSet<V>(commandExecutor, name);
}
@Override
public <V> RSet<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, commandExecutor, name);
}
/**
* Returns script with eval-operations support
*
* @return
*/
@Override
public RScript getScript() {
return new RedissonScript(commandExecutor);
}
@ -221,10 +251,37 @@ public class Redisson implements RedissonClient {
return new RedissonSortedSet<V>(commandExecutor, name);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name, Codec codec) {
return new RedissonSortedSet<V>(codec, commandExecutor, name);
}
/**
* Returns Redis Sorted Set instance by name
*
* @param name
* @return
*/
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(commandExecutor, name);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, commandExecutor, name);
}
/**
* Returns String based Redis Sorted Set instance by name
* All elements are inserted with the same score during addition,
* in order to force lexicographical ordering
*
* @param name
* @return
*/
@Override
public RLexSortedSet getLexSortedSet(String name) {
return new RedissonLexSortedSet(commandExecutor, name);
}
@ -240,6 +297,11 @@ public class Redisson implements RedissonClient {
return new RedissonTopic<M>(commandExecutor, name);
}
@Override
public <M> RTopic<M> getTopic(String name, Codec codec) {
return new RedissonTopic<M>(codec, commandExecutor, name);
}
/**
* Returns topic instance satisfies by pattern name.
*
@ -256,6 +318,12 @@ public class Redisson implements RedissonClient {
return new RedissonPatternTopic<M>(commandExecutor, pattern);
}
@Override
public <M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic<M>(codec, commandExecutor, pattern);
}
/**
* Returns distributed queue instance by name.
*
@ -267,6 +335,11 @@ public class Redisson implements RedissonClient {
return new RedissonQueue<V>(commandExecutor, name);
}
@Override
public <V> RQueue<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, commandExecutor, name);
}
/**
* Returns distributed blocking queue instance by name.
*
@ -278,6 +351,11 @@ public class Redisson implements RedissonClient {
return new RedissonBlockingQueue<V>(commandExecutor, name);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name);
}
/**
* Returns distributed deque instance by name.
*
@ -289,6 +367,11 @@ public class Redisson implements RedissonClient {
return new RedissonDeque<V>(commandExecutor, name);
}
@Override
public <V> RDeque<V> getDeque(String name, Codec codec) {
return new RedissonDeque<V>(codec, commandExecutor, name);
}
/**
* Returns distributed "atomic long" instance by name.
*

@ -17,6 +17,7 @@ package org.redisson;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
@ -25,10 +26,11 @@ import org.redisson.core.RBucketAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
import org.redisson.core.RLexSortedSetAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScoredSortedSetAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;
@ -48,56 +50,101 @@ public class RedissonBatch implements RBatch {
return new RedissonBucket<V>(executorService, name);
}
public <V> RBucketAsync<V> getBucket(String name, Codec codec) {
return new RedissonBucket<V>(codec, executorService, name);
}
@Override
public <V> RHyperLogLogAsync<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(executorService, name);
}
public <V> RHyperLogLogAsync<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLog<V>(codec, executorService, name);
}
@Override
public <V> RListAsync<V> getList(String name) {
return new RedissonList<V>(executorService, name);
}
public <V> RListAsync<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, executorService, name);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name);
}
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, executorService, name);
}
@Override
public <V> RSetAsync<V> getSet(String name) {
return new RedissonSet<V>(executorService, name);
}
public <V> RSetAsync<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, executorService, name);
}
@Override
public <M> RTopicAsync<M> getTopic(String name) {
return new RedissonTopic<M>(executorService, name);
}
public <M> RTopicAsync<M> getTopic(String name, Codec codec) {
return new RedissonTopic<M>(codec, executorService, name);
}
@Override
public <V> RQueueAsync<V> getQueue(String name) {
return new RedissonQueue<V>(executorService, name);
}
public <V> RQueueAsync<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, executorService, name);
}
@Override
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(executorService, name);
}
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, executorService, name);
}
@Override
public <V> RDequeAsync<V> getDequeAsync(String name) {
return new RedissonDeque<V>(executorService, name);
}
public <V> RDequeAsync<V> getDequeAsync(String name, Codec codec) {
return new RedissonDeque<V>(codec, executorService, name);
}
@Override
public RAtomicLongAsync getAtomicLongAsync(String name) {
return new RedissonAtomicLong(executorService, name);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
public <V> RScoredSortedSetAsync<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(executorService, name);
}
public <V> RScoredSortedSetAsync<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, executorService, name);
}
@Override
public RLexSortedSetAsync getLexSortedSet(String name) {
return new RedissonLexSortedSet(executorService, name);
}
@Override
public RScriptAsync getScript() {
return new RedissonScript(executorService);

@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.decoder.ListDrainToDecoder;
@ -43,6 +44,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
super(commandExecutor, name);
}
protected RedissonBlockingQueue(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public Future<Boolean> putAsync(V e) {
return offerAsync(e);
@ -60,7 +65,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public Future<V> takeAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), 0);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}
@Override
@ -71,7 +76,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public Future<V> pollAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}
@Override
@ -88,7 +93,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
}
@Override
@ -113,7 +118,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
@ -133,7 +138,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +

@ -17,6 +17,7 @@ package org.redisson;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RBucket;
@ -28,6 +29,10 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
super(connectionManager, name);
}
protected RedissonBucket(Codec codec, CommandExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@Override
public V get() {
return get(getAsync());
@ -35,7 +40,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<V> getAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.GET, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName());
}
@Override
@ -45,7 +50,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Void> setAsync(V value) {
return commandExecutor.writeAsync(getName(), RedisCommands.SET, getName(), value);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value);
}
@Override
@ -55,7 +60,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeAsync(getName(), RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}
@Override
@ -65,7 +70,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public Future<Boolean> existsAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.EXISTS, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.EXISTS, getName());
}
}

@ -18,6 +18,7 @@ package org.redisson;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
@ -53,6 +54,8 @@ public interface RedissonClient {
*/
<V> RBucket<V> getBucket(String name);
<V> RBucket<V> getBucket(String name, Codec codec);
/**
* Returns a list of object holder by a key pattern
*/
@ -66,6 +69,8 @@ public interface RedissonClient {
*/
<V> RHyperLogLog<V> getHyperLogLog(String name);
<V> RHyperLogLog<V> getHyperLogLog(String name, Codec codec);
/**
* Returns list instance by name.
*
@ -74,6 +79,8 @@ public interface RedissonClient {
*/
<V> RList<V> getList(String name);
<V> RList<V> getList(String name, Codec codec);
/**
* Returns map instance by name.
*
@ -82,6 +89,8 @@ public interface RedissonClient {
*/
<K, V> RMap<K, V> getMap(String name);
<K, V> RMap<K, V> getMap(String name, Codec codec);
/**
* Returns lock instance by name.
*
@ -98,6 +107,8 @@ public interface RedissonClient {
*/
<V> RSet<V> getSet(String name);
<V> RSet<V> getSet(String name, Codec codec);
/**
* Returns sorted set instance by name.
*
@ -106,8 +117,26 @@ public interface RedissonClient {
*/
<V> RSortedSet<V> getSortedSet(String name);
<V> RSortedSet<V> getSortedSet(String name, Codec codec);
/**
* Returns Redis Sorted Set instance by name
*
* @param name
* @return
*/
<V> RScoredSortedSet<V> getScoredSortedSet(String name);
<V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec);
/**
* Returns String based Redis Sorted Set instance by name
* All elements are inserted with the same score during addition,
* in order to force lexicographical ordering
*
* @param name
* @return
*/
RLexSortedSet getLexSortedSet(String name);
/**
@ -118,6 +147,8 @@ public interface RedissonClient {
*/
<M> RTopic<M> getTopic(String name);
<M> RTopic<M> getTopic(String name, Codec codec);
/**
* Returns topic instance satisfies by pattern name.
*
@ -131,6 +162,8 @@ public interface RedissonClient {
*/
<M> RPatternTopic<M> getPatternTopic(String pattern);
<M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec);
/**
* Returns queue instance by name.
*
@ -139,6 +172,8 @@ public interface RedissonClient {
*/
<V> RQueue<V> getQueue(String name);
<V> RQueue<V> getQueue(String name, Codec codec);
/**
* Returns blocking queue instance by name.
*
@ -147,6 +182,8 @@ public interface RedissonClient {
*/
<V> RBlockingQueue<V> getBlockingQueue(String name);
<V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec);
/**
* Returns deque instance by name.
*
@ -155,6 +192,8 @@ public interface RedissonClient {
*/
<V> RDeque<V> getDeque(String name);
<V> RDeque<V> getDeque(String name, Codec codec);
/**
* Returns "atomic long" instance by name.
*

@ -18,6 +18,7 @@ package org.redisson;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -47,6 +48,10 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
super(commandExecutor, name);
}
public RedissonDeque(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public void addFirst(V e) {
get(addFirstAsync(e));
@ -54,7 +59,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<Void> addFirstAsync(V e) {
return commandExecutor.writeAsync(getName(), LPUSH_VOID, getName(), e);
return commandExecutor.writeAsync(getName(), codec, LPUSH_VOID, getName(), e);
}
@Override
@ -64,7 +69,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<Void> addLastAsync(V e) {
return commandExecutor.writeAsync(getName(), RPUSH_VOID, getName(), e);
return commandExecutor.writeAsync(getName(), codec, RPUSH_VOID, getName(), e);
}
@ -106,7 +111,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<V> getLastAsync() {
return commandExecutor.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1);
return commandExecutor.readAsync(getName(), codec, LRANGE_SINGLE, getName(), -1, -1);
}
@Override
@ -125,7 +130,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<Boolean> offerFirstAsync(V e) {
return commandExecutor.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e);
return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, getName(), e);
}
@Override
@ -170,7 +175,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<V> pollLastAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName());
}
@ -216,7 +221,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public Future<V> removeLastAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName());
}
@Override

@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RHyperLogLog;
@ -31,6 +32,10 @@ public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperL
super(commandExecutor, name);
}
protected RedissonHyperLogLog(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public boolean add(V obj) {
return get(addAsync(obj));
@ -58,7 +63,7 @@ public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperL
@Override
public Future<Boolean> addAsync(V obj) {
return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), obj);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFADD, getName(), obj);
}
@Override
@ -66,12 +71,12 @@ public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperL
List<Object> args = new ArrayList<Object>(objects.size() + 1);
args.add(getName());
args.addAll(objects);
return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), args.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFADD, getName(), args.toArray());
}
@Override
public Future<Long> countAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, getName());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFCOUNT, getName());
}
@Override
@ -79,7 +84,7 @@ public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperL
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFCOUNT, args.toArray());
}
@Override
@ -87,7 +92,7 @@ public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperL
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
return commandExecutor.writeAsync(getName(), RedisCommands.PFMERGE, args.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFMERGE, args.toArray());
}
}

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RLexSortedSet;

@ -33,6 +33,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
@ -58,13 +59,17 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
super(commandExecutor, name);
}
protected RedissonList(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public int size() {
return get(sizeAsync());
}
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), LLEN, getName());
return commandExecutor.readAsync(getName(), codec, LLEN, getName());
}
@Override
@ -93,7 +98,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
private Future<Collection<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), LRANGE, getName(), 0, -1);
return commandExecutor.readAsync(getName(), codec, LRANGE, getName(), 0, -1);
}
@Override
@ -109,7 +114,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> addAsync(V e) {
return commandExecutor.writeAsync(getName(), RPUSH_BOOLEAN, getName(), e);
return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, getName(), e);
}
@Override
@ -123,7 +128,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
protected Future<Boolean> removeAsync(Object o, int count) {
return commandExecutor.writeAsync(getName(), LREM_SINGLE, getName(), count, o);
return commandExecutor.writeAsync(getName(), codec, LREM_SINGLE, getName(), count, o);
}
protected boolean remove(Object o, int count) {
@ -132,7 +137,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
"for j = 0, table.getn(ARGV), 1 do " +
@ -166,7 +171,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Future<Long> res = commandExecutor.writeAsync(getName(), RPUSH, args.toArray());
Future<Long> res = commandExecutor.writeAsync(getName(), codec, RPUSH, args.toArray());
res.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
@ -194,7 +199,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
Collections.reverse(elements);
elements.add(0, getName());
Long newSize = commandExecutor.write(getName(), LPUSH, elements.toArray());
Long newSize = commandExecutor.write(getName(), codec, LPUSH, elements.toArray());
return newSize != size;
}
@ -203,7 +208,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
return commandExecutor.evalWrite(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
return commandExecutor.evalWrite(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
@ -219,7 +224,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
@ -241,7 +246,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
@ -273,7 +278,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<V> getAsync(int index) {
return commandExecutor.readAsync(getName(), LINDEX, getName(), index);
return commandExecutor.readAsync(getName(), codec, LINDEX, getName(), index);
}
@Override
@ -314,7 +319,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<V> setAsync(int index, V element) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", 5),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", 5),
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v",
@ -329,7 +334,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Void> fastSetAsync(int index, V element) {
return commandExecutor.writeAsync(getName(), RedisCommands.LSET, getName(), index, element);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LSET, getName(), index, element);
}
@Override
@ -342,10 +347,10 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
checkIndex(index);
if (index == 0) {
return commandExecutor.write(getName(), LPOP, getName());
return commandExecutor.write(getName(), codec, LPOP, getName());
}
return commandExecutor.evalWrite(getName(), EVAL_OBJECT,
return commandExecutor.evalWrite(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1]);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
@ -365,7 +370,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
private <R> Future<R> indexOfAsync(Object o, Convertor<R> convertor) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<R>("EVAL", convertor, 4),
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<R>("EVAL", convertor, 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +
@ -390,7 +395,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Integer> lastIndexOfAsync(Object o) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +
@ -513,7 +518,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
}
return commandExecutor.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1);
return commandExecutor.read(getName(), codec, LRANGE, getName(), fromIndex, toIndex - 1);
}
public String toString() {

@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -53,12 +54,20 @@ import io.netty.util.concurrent.Future;
//TODO implement watching by keys instead of map name
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
protected RedissonMap(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
public RedissonMap(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public int size() {
return get(sizeAsync());
@ -66,7 +75,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.HLEN, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
}
@Override
@ -76,12 +85,12 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public boolean containsKey(Object key) {
return commandExecutor.read(getName(), RedisCommands.HEXISTS, getName(), key);
return commandExecutor.read(getName(), codec, RedisCommands.HEXISTS, getName(), key);
}
@Override
public Future<Boolean> containsKeyAsync(Object key) {
return commandExecutor.readAsync(getName(), RedisCommands.HEXISTS, getName(), key);
return commandExecutor.readAsync(getName(), codec, RedisCommands.HEXISTS, getName(), key);
}
@Override
@ -91,7 +100,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> containsValueAsync(Object value) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
@ -110,7 +119,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
List<Object> args = new ArrayList<Object>(keys.size() + 1);
args.add(getName());
args.addAll(keys);
List<V> list = commandExecutor.read(getName(), RedisCommands.HMGET, args.toArray());
List<V> list = commandExecutor.read(getName(), codec, RedisCommands.HMGET, args.toArray());
Map<K, V> result = new HashMap<K, V>(list.size());
for (int index = 0; index < args.size()-1; index++) {
@ -144,7 +153,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return;
}
commandExecutor.write(getName(), RedisCommands.HMSET, getName(), map);
commandExecutor.write(getName(), codec, RedisCommands.HMSET, getName(), map);
}
@Override
@ -159,7 +168,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Set<K>> keySetAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.HKEYS, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.HKEYS, getName());
}
@Override
@ -169,12 +178,12 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Collection<V>> valuesAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.HVALS, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.HVALS, getName());
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
Map<K, V> map = commandExecutor.read(getName(), RedisCommands.HGETALL, getName());
Map<K, V> map = commandExecutor.read(getName(), codec, RedisCommands.HGETALL, getName());
return map.entrySet();
}
@ -185,7 +194,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> putIfAbsentAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end",
Collections.<Object>singletonList(getName()), key, value);
}
@ -197,8 +206,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Long> removeAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(),
new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return 0 end",
Collections.<Object>singletonList(getName()), key, value);
}
@ -210,9 +218,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteAsync(getName(),
new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4,
Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return true; else return false; end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
}
@ -224,20 +230,19 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> replaceAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(),
new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end",
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public Future<V> getAsync(K key) {
return commandExecutor.readAsync(getName(), RedisCommands.HGET, getName(), key);
return commandExecutor.readAsync(getName(), codec, RedisCommands.HGET, getName(), key);
}
@Override
public Future<V> putAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v",
Collections.<Object>singletonList(getName()), key, value);
}
@ -245,15 +250,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> removeAsync(K key) {
return commandExecutor.evalWriteAsync(getName(),
new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v",
Collections.<Object>singletonList(getName()), key);
}
@Override
public Future<Boolean> fastPutAsync(K key, V value) {
return commandExecutor.writeAsync(getName(), RedisCommands.HSET, getName(), key, value);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HSET, getName(), key, value);
}
@Override
@ -270,7 +274,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
List<Object> args = new ArrayList<Object>(keys.length + 1);
args.add(getName());
args.addAll(Arrays.asList(keys));
return commandExecutor.writeAsync(getName(), RedisCommands.HDEL, args.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HDEL, args.toArray());
}
@Override
@ -279,7 +283,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
private MapScanResult<Object, V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos);
return commandExecutor.read(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos);
}
private Iterator<Map.Entry<K, V>> iterator() {

@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.List;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
@ -35,10 +36,16 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
final CommandExecutor commandExecutor;
private final String name;
private final Codec codec;
protected RedissonPatternTopic(CommandExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected RedissonPatternTopic(Codec codec, CommandExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
}
@Override
@ -53,7 +60,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name, codec);
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);

@ -17,6 +17,7 @@ package org.redisson;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RQueue;
@ -35,6 +36,10 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
super(commandExecutor, name);
}
protected RedissonQueue(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public boolean offer(V e) {
return add(e);
@ -68,7 +73,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
@Override
public Future<V> pollAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.LPOP, getName());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPOP, getName());
}
@Override
@ -98,7 +103,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName) {
return commandExecutor.writeAsync(getName(), RedisCommands.RPOPLPUSH, getName(), queueName);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOPLPUSH, getName(), queueName);
}
@Override

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RScript;
@ -64,63 +65,99 @@ public class RedissonScript implements RScript {
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType) {
return eval(null, mode, luaScript, returnType);
return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType);
}
public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType) {
return eval(key, mode, luaScript, returnType, Collections.emptyList());
@Override
public <R> R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType) {
return eval(null, mode, codec, luaScript, returnType);
}
public <R> R eval(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType) {
return eval(key, mode, codec, luaScript, returnType, Collections.emptyList());
}
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return eval(null, mode, luaScript, returnType, keys, values);
return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values);
}
@Override
public <R> R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return eval(null, mode, codec, luaScript, returnType, keys, values);
}
public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
public <R> R eval(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalAsync(key, mode, codec, luaScript, returnType, keys, values));
}
@Override
public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return evalAsync(null, mode, luaScript, returnType, keys, values);
return evalAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values);
}
public <R> Future<R> evalAsync(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
@Override
public <R> Future<R> evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return evalAsync(null, mode, codec, luaScript, returnType, keys, values);
}
public <R> Future<R> evalAsync(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
if (mode == Mode.READ_ONLY) {
return commandExecutor.evalReadAsync(key, returnType.getCommand(), luaScript, keys, values);
return commandExecutor.evalReadAsync(key, codec, returnType.getCommand(), luaScript, keys, values);
}
return commandExecutor.evalWriteAsync(key, returnType.getCommand(), luaScript, keys, values);
return commandExecutor.evalWriteAsync(key, codec, returnType.getCommand(), luaScript, keys, values);
}
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType) {
return evalSha(null, mode, shaDigest, returnType);
return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType);
}
@Override
public <R> R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType) {
return evalSha(null, mode, codec, shaDigest, returnType);
}
public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType) {
return evalSha(key, mode, shaDigest, returnType, Collections.emptyList());
return evalSha(key, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList());
}
public <R> R evalSha(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType) {
return evalSha(key, mode, codec, shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalSha(null, mode, shaDigest, returnType, keys, values);
return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values);
}
public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalShaAsync(key, mode, shaDigest, returnType, keys, values));
@Override
public <R> R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalSha(null, mode, codec, shaDigest, returnType, keys, values);
}
public <R> R evalSha(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalShaAsync(key, mode, codec, shaDigest, returnType, keys, values));
}
@Override
public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalShaAsync(null, mode, shaDigest, returnType, keys, values);
return evalShaAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values);
}
@Override
public <R> Future<R> evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalShaAsync(null, mode, codec, shaDigest, returnType, keys, values);
}
public <R> Future<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
public <R> Future<R> evalShaAsync(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA");
if (mode == Mode.READ_ONLY) {
return commandExecutor.evalReadAsync(key, command, shaDigest, keys, values);
return commandExecutor.evalReadAsync(key, codec, command, shaDigest, keys, values);
}
return commandExecutor.evalWriteAsync(key, command, shaDigest, keys, values);
return commandExecutor.evalWriteAsync(key, codec, command, shaDigest, keys, values);
}
@Override
@ -196,12 +233,22 @@ public class RedissonScript implements RScript {
@Override
public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType) {
return evalShaAsync(null, mode, shaDigest, returnType, Collections.emptyList());
return evalShaAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> Future<R> evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType) {
return evalShaAsync(null, mode, codec, shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType) {
return evalAsync(null, mode, luaScript, returnType, Collections.emptyList());
return evalAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, Collections.emptyList());
}
@Override
public <R> Future<R> evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType) {
return evalAsync(null, mode, codec, luaScript, returnType, Collections.emptyList());
}
}

@ -23,6 +23,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
@ -40,10 +41,16 @@ import io.netty.util.concurrent.Future;
*/
public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<Boolean> EVAL_OBJECTS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
protected RedissonSet(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
public RedissonSet(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public int size() {
return get(sizeAsync());
@ -51,7 +58,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.SCARD, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.SCARD, getName());
}
@Override
@ -66,11 +73,11 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> containsAsync(Object o) {
return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
return commandExecutor.readAsync(getName(), codec, RedisCommands.SISMEMBER, getName(), o);
}
private ListScanResult<V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos);
return commandExecutor.read(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -128,7 +135,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
}
private Future<Collection<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.SMEMBERS, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.SMEMBERS, getName());
}
@Override
@ -150,7 +157,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> addAsync(V e) {
return commandExecutor.writeAsync(getName(), RedisCommands.SADD_SINGLE, getName(), e);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e);
}
@Override
@ -160,12 +167,12 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<V> removeRandomAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.SPOP_SINGLE, getName());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
}
@Override
public Future<Boolean> removeAsync(Object o) {
return commandExecutor.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), o);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SREM_SINGLE, getName(), o);
}
@Override
@ -180,7 +187,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS,
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
@ -206,7 +213,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
return commandExecutor.writeAsync(getName(), RedisCommands.SADD, args.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD, args.toArray());
}
@Override
@ -216,7 +223,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
"local changed = false " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
@ -241,7 +248,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "

@ -102,8 +102,16 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.SETNX, getCurrentVersionKey(), 0L);
}
public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
loadComparator();
commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.SETNX, getCurrentVersionKey(), 0L);
}
private void loadComparator() {
commandExecutor.read(getName(), new SyncOperation<Void>() {
commandExecutor.read(getName(), codec, new SyncOperation<Void>() {
@Override
public Void execute(Codec codec, RedisConnection conn) {
loadComparator(conn);
@ -155,7 +163,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public int size() {
return commandExecutor.read(getName(), RedisCommands.LLEN, getName());
return commandExecutor.read(getName(), codec, RedisCommands.LLEN, getName());
}
private int size(RedisConnection connection) {
@ -169,7 +177,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean contains(final Object o) {
return commandExecutor.read(getName(), new SyncOperation<Boolean>() {
return commandExecutor.read(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return binarySearch((V)o, codec, conn).getIndex() >= 0;
@ -240,7 +248,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private void remove(final int index) {
commandExecutor.write(getName(), new SyncOperation<V>() {
commandExecutor.write(getName(), codec, new SyncOperation<V>() {
@Override
public V execute(Codec codec, RedisConnection conn) {
if (index == 0) {
@ -268,18 +276,18 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private V get(final int index) {
return commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), index);
return commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), index);
}
@Override
public Object[] toArray() {
List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray(a);
}
@ -293,7 +301,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean add(final V value) {
return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return add(value, codec, conn);
@ -458,7 +466,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean remove(final Object value) {
return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return remove(value, codec, conn);
@ -573,7 +581,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V first() {
V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), 0);
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), 0);
if (res == null) {
throw new NoSuchElementException();
}
@ -582,7 +590,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V last() {
V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), -1);
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), -1);
if (res == null) {
throw new NoSuchElementException();
}
@ -602,7 +610,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
String className = comparator.getClass().getName();
final String comparatorSign = className + ":" + calcClassSign(className);
Boolean res = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
Boolean res = commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('llen', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[1]); return true; "
+ "else return false; end",
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);

@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.List;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
@ -38,10 +39,16 @@ public class RedissonTopic<M> implements RTopic<M> {
final CommandExecutor commandExecutor;
private final String name;
private final Codec codec;
protected RedissonTopic(CommandExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected RedissonTopic(Codec codec, CommandExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
}
public List<String> getChannelNames() {
@ -55,7 +62,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public Future<Long> publishAsync(M message) {
return commandExecutor.writeAsync(name, RedisCommands.PUBLISH, name, message);
return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, message);
}
@Override
@ -70,7 +77,7 @@ public class RedissonTopic<M> implements RTopic<M> {
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name, codec);
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);

@ -135,6 +135,10 @@ public class RedisConnection implements RedisCommands {
return closed;
}
public void forceReconnect() {
channel.close();
}
public ChannelFuture closeAsync() {
setClosed(true);
return channel.close();

@ -62,7 +62,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
public static final char LF = '\n';
private static final char ZERO = '0';
// no need concurrent map responses are coming consecutive
// It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, MultiDecoder<Object>> messageDecoders = new HashMap<String, MultiDecoder<Object>>();
private final Map<String, CommandData<Object, Object>> channels = PlatformDependent.newConcurrentHashMap();

@ -225,9 +225,12 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public void returnConnection(RedisConnection connection) {
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed() || connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
if (entry.isFreezed()) {
connection.closeAsync();
} else {
if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) {
connection.forceReconnect();
}
entry.getConnections().add(connection);
}
entry.getConnectionsSemaphore().release();

@ -244,7 +244,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -79,15 +79,15 @@ public interface ConnectionManager {
PubSubConnectionEntry getEntry(String channelName);
PubSubConnectionEntry subscribe(String channelName);
PubSubConnectionEntry subscribe(String channelName, Codec codec);
PubSubConnectionEntry psubscribe(String pattern);
PubSubConnectionEntry psubscribe(String pattern, Codec codec);
<V> void subscribe(RedisPubSubListener<V> listener, String channelName);
void unsubscribe(String channelName);
Codec unsubscribe(String channelName);
void punsubscribe(String channelName);
Codec punsubscribe(String channelName);
void shutdown();

@ -221,7 +221,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public PubSubConnectionEntry subscribe(String channelName) {
public PubSubConnectionEntry subscribe(String channelName, Codec codec) {
// multiple channel names per PubSubConnections allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
@ -240,7 +240,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName);
return subscribe(channelName, codec);
}
entry.subscribe(codec, channelName);
return entry;
@ -262,7 +262,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName);
return subscribe(channelName, codec);
}
entry.subscribe(codec, channelName);
return entry;
@ -270,7 +270,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public PubSubConnectionEntry psubscribe(String channelName) {
public PubSubConnectionEntry psubscribe(String channelName, Codec codec) {
// multiple channel names per PubSubConnections allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
@ -289,7 +289,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
return psubscribe(channelName, codec);
}
entry.psubscribe(codec, channelName);
return entry;
@ -311,7 +311,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
return psubscribe(channelName, codec);
}
entry.psubscribe(codec, channelName);
return entry;
@ -368,13 +368,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public void unsubscribe(final String channelName) {
public Codec unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return;
return null;
}
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
return entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
@ -393,13 +393,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public void punsubscribe(final String channelName) {
public Codec punsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return;
return null;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
return entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
@ -438,9 +438,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.close();
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
unsubscribe(channelName);
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(channelName);
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}

@ -175,10 +175,8 @@ public class MasterSlaveEntry {
if (!entry.getClient().equals(connection.getRedisClient())) {
connection.closeAsync();
return;
} else if (connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
connection.closeAsync();
entry.getConnectionsSemaphore().release();
return;
} else if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) {
connection.forceReconnect();
}
entry.getConnections().add(connection);

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -32,6 +33,8 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE}
@ -42,6 +45,8 @@ public class PubSubConnectionEntry {
private final Semaphore subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
private final Map<String, Codec> channel2Codec = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
@ -121,11 +126,13 @@ public class PubSubConnectionEntry {
subscribedChannelsAmount.release();
}
public void subscribe(Codec codec, final String channelName) {
public void subscribe(Codec codec, String channelName) {
channel2Codec.put(channelName, codec);
conn.subscribe(codec, channelName);
}
public void psubscribe(Codec codec, final String pattern) {
public void psubscribe(Codec codec, String pattern) {
channel2Codec.put(pattern, codec);
conn.psubscribe(codec, pattern);
}
@ -134,7 +141,7 @@ public class PubSubConnectionEntry {
conn.subscribe(codec, channel);
}
public void unsubscribe(final String channel, RedisPubSubListener listener) {
public Codec unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public boolean onStatus(PubSubType type, String ch) {
@ -148,6 +155,7 @@ public class PubSubConnectionEntry {
});
conn.addOneShotListener(listener);
conn.unsubscribe(channel);
return channel2Codec.remove(channel);
}
private void removeListeners(String channel) {
@ -163,7 +171,7 @@ public class PubSubConnectionEntry {
subscribedChannelsAmount.release();
}
public void punsubscribe(final String channel, RedisPubSubListener listener) {
public Codec punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public boolean onStatus(PubSubType type, String ch) {
@ -176,6 +184,7 @@ public class PubSubConnectionEntry {
});
conn.addOneShotListener(listener);
conn.punsubscribe(channel);
return channel2Codec.remove(channel);
}

@ -61,7 +61,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -48,7 +48,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setPassword(cfg.getPassword());
newconfig.setDatabase(cfg.getDatabase());
newconfig.setClientName(cfg.getClientName());
newconfig.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
newconfig.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails());
newconfig.setMasterAddress(addr);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());

@ -17,6 +17,8 @@ package org.redisson.core;
import java.util.List;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Future;
/**
@ -41,6 +43,8 @@ public interface RBatch {
*/
<V> RBucketAsync<V> getBucket(String name);
<V> RBucketAsync<V> getBucket(String name, Codec codec);
/**
* Returns HyperLogLog object
*
@ -49,6 +53,8 @@ public interface RBatch {
*/
<V> RHyperLogLogAsync<V> getHyperLogLog(String name);
<V> RHyperLogLogAsync<V> getHyperLogLog(String name, Codec codec);
/**
* Returns list instance by name.
*
@ -57,6 +63,8 @@ public interface RBatch {
*/
<V> RListAsync<V> getList(String name);
<V> RListAsync<V> getList(String name, Codec codec);
/**
* Returns map instance by name.
*
@ -65,6 +73,8 @@ public interface RBatch {
*/
<K, V> RMapAsync<K, V> getMap(String name);
<K, V> RMapAsync<K, V> getMap(String name, Codec codec);
/**
* Returns set instance by name.
*
@ -73,6 +83,8 @@ public interface RBatch {
*/
<V> RSetAsync<V> getSet(String name);
<V> RSetAsync<V> getSet(String name, Codec codec);
/**
* Returns topic instance by name.
*
@ -81,6 +93,8 @@ public interface RBatch {
*/
<M> RTopicAsync<M> getTopic(String name);
<M> RTopicAsync<M> getTopic(String name, Codec codec);
/**
* Returns queue instance by name.
*
@ -89,6 +103,8 @@ public interface RBatch {
*/
<V> RQueueAsync<V> getQueue(String name);
<V> RQueueAsync<V> getQueue(String name, Codec codec);
/**
* Returns blocking queue instance by name.
*
@ -97,6 +113,8 @@ public interface RBatch {
*/
<V> RBlockingQueueAsync<V> getBlockingQueue(String name);
<V> RBlockingQueueAsync<V> getBlockingQueue(String name, Codec codec);
/**
* Returns deque instance by name.
*
@ -105,6 +123,8 @@ public interface RBatch {
*/
<V> RDequeAsync<V> getDequeAsync(String name);
<V> RDequeAsync<V> getDequeAsync(String name, Codec codec);
/**
* Returns "atomic long" instance by name.
*
@ -113,7 +133,25 @@ public interface RBatch {
*/
RAtomicLongAsync getAtomicLongAsync(String name);
<V> RScoredSortedSet<V> getScoredSortedSet(String name);
/**
* Returns Redis Sorted Set instance by name
*
* @param name
* @return
*/
<V> RScoredSortedSetAsync<V> getScoredSortedSet(String name);
<V> RScoredSortedSetAsync<V> getScoredSortedSet(String name, Codec codec);
/**
* Returns String based Redis Sorted Set instance by name
* All elements are inserted with the same score during addition,
* in order to force lexicographical ordering
*
* @param name
* @return
*/
RLexSortedSetAsync getLexSortedSet(String name);
/**
* Returns script operations object

@ -17,6 +17,7 @@ package org.redisson.core;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -47,12 +48,20 @@ public interface RScript extends RScriptAsync {
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType);
<R> R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType);
<R> R eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> R eval(Mode mode, String luaScript, ReturnType returnType);
<R> R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType);
String scriptLoad(String luaScript);
List<Boolean> scriptExists(String ... shaDigests);

@ -17,6 +17,7 @@ package org.redisson.core;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RScript.Mode;
import org.redisson.core.RScript.ReturnType;
@ -28,12 +29,20 @@ public interface RScriptAsync {
<R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType);
<R> Future<R> evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType);
<R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType);
<R> Future<R> evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType);
Future<String> scriptLoadAsync(String luaScript);
Future<List<Boolean>> scriptExistsAsync(String ... shaDigests);

@ -6,11 +6,27 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.codec.StringCodec;
import org.redisson.core.RBatch;
import org.redisson.core.RListAsync;
import io.netty.util.concurrent.Future;
public class RedissonBatchTest extends BaseTest {
@Test
public void testDifferentCodecs() {
RBatch b = redisson.createBatch();
b.getMap("test1").putAsync("1", "2");
b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3");
Future<Object> val1 = b.getMap("test1").getAsync("1");
Future<Object> val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21");
b.execute();
Assert.assertEquals("2", val1.getNow());
Assert.assertEquals("3", val2.getNow());
}
@Test
public void testBatchList() {
RBatch b = redisson.createBatch();

Loading…
Cancel
Save