diff --git a/src/main/java/org/redisson/BaseConfig.java b/src/main/java/org/redisson/BaseConfig.java index 4f506b34b..64db8d037 100644 --- a/src/main/java/org/redisson/BaseConfig.java +++ b/src/main/java/org/redisson/BaseConfig.java @@ -69,7 +69,7 @@ class BaseConfig> { setTimeout(config.getTimeout()); setClientName(config.getClientName()); setPingTimeout(config.getPingTimeout()); - setCloseConnectionAfterFailAttempts(config.getCloseConnectionAfterFailAttempts()); + setRefreshConnectionAfterFails(config.getRefreshConnectionAfterFails()); } /** @@ -190,16 +190,16 @@ class BaseConfig> { } /** - * Close connection if it has failAttemptsAmount + * Reconnect connection if it has failAttemptsAmount * fails in a row during command sending. Turned off by default. * * @param failAttemptsAmount */ - public T setCloseConnectionAfterFailAttempts(int failAttemptsAmount) { + public T setRefreshConnectionAfterFails(int failAttemptsAmount) { this.closeConnectionAfterFailAttempts = failAttemptsAmount; return (T) this; } - public int getCloseConnectionAfterFailAttempts() { + public int getRefreshConnectionAfterFails() { return closeConnectionAfterFailAttempts; } diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 89faa279a..2ce4924b8 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -278,12 +278,12 @@ public class CommandBatchExecutorService extends CommandExecutorService { } @Override - public R read(String key, SyncOperation operation) { + public R read(String key, Codec codec, SyncOperation operation) { throw new UnsupportedOperationException(); } @Override - public R write(String key, SyncOperation operation) { + public R write(String key, Codec codec, SyncOperation operation) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/org/redisson/CommandExecutor.java b/src/main/java/org/redisson/CommandExecutor.java index a0c0da0af..143e4723d 100644 --- a/src/main/java/org/redisson/CommandExecutor.java +++ b/src/main/java/org/redisson/CommandExecutor.java @@ -60,9 +60,9 @@ public interface CommandExecutor { R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - R read(String key, SyncOperation operation); + R read(String key, Codec codec, SyncOperation operation); - R write(String key, SyncOperation operation); + R write(String key, Codec codec, SyncOperation operation); R read(String key, Codec codec, RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 85a0649e8..d09e80365 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -236,17 +236,17 @@ public class CommandExecutorService implements CommandExecutor { return readAsync(key, connectionManager.getCodec(), command, params); } - public R write(String key, SyncOperation operation) { + public R write(String key, Codec codec, SyncOperation operation) { int slot = connectionManager.calcSlot(key); - return async(false, slot, operation, 0); + return async(false, codec, slot, operation, 0); } - public R read(String key, SyncOperation operation) { + public R read(String key, Codec codec, SyncOperation operation) { int slot = connectionManager.calcSlot(key); - return async(true, slot, operation, 0); + return async(true, codec, slot, operation, 0); } - private R async(boolean readOnlyMode, int slot, SyncOperation operation, int attempt) { + private R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation operation, int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { return null; } @@ -259,15 +259,15 @@ public class CommandExecutorService implements CommandExecutor { connection = connectionManager.connectionWriteOp(slot); } try { - return operation.execute(connectionManager.getCodec(), connection); + return operation.execute(codec, connection); } catch (RedisMovedException e) { - return async(readOnlyMode, e.getSlot(), operation, attempt); + return async(readOnlyMode, codec, e.getSlot(), operation, attempt); } catch (RedisTimeoutException e) { if (attempt == connectionManager.getConfig().getRetryAttempts()) { throw e; } attempt++; - return async(readOnlyMode, slot, operation, attempt); + return async(readOnlyMode, codec, slot, operation, attempt); } finally { connectionManager.getShutdownLatch().release(); if (readOnlyMode) { @@ -286,7 +286,7 @@ public class CommandExecutorService implements CommandExecutor { Thread.currentThread().interrupt(); } attempt++; - return async(readOnlyMode, slot, operation, attempt); + return async(readOnlyMode, codec, slot, operation, attempt); } } diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 943a4ca73..bad467b4d 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ClusterConnectionManager; import org.redisson.connection.ConnectionManager; @@ -123,6 +124,12 @@ public class Redisson implements RedissonClient { return new RedissonBucket(commandExecutor, name); } + @Override + public RBucket getBucket(String name, Codec codec) { + return new RedissonBucket(codec, commandExecutor, name); + } + + /** * Returns a list of object holder by a key pattern * @@ -157,6 +164,12 @@ public class Redisson implements RedissonClient { return new RedissonHyperLogLog(commandExecutor, name); } + @Override + public RHyperLogLog getHyperLogLog(String name, Codec codec) { + return new RedissonHyperLogLog(codec, commandExecutor, name); + } + + /** * Returns distributed list instance by name. * @@ -168,6 +181,11 @@ public class Redisson implements RedissonClient { return new RedissonList(commandExecutor, name); } + @Override + public RList getList(String name, Codec codec) { + return new RedissonList(codec, commandExecutor, name); + } + /** * Returns distributed map instance by name. * @@ -179,6 +197,11 @@ public class Redisson implements RedissonClient { return new RedissonMap(commandExecutor, name); } + @Override + public RMap getMap(String name, Codec codec) { + return new RedissonMap(codec, commandExecutor, name); + } + /** * Returns distributed lock instance by name. * @@ -201,11 +224,18 @@ public class Redisson implements RedissonClient { return new RedissonSet(commandExecutor, name); } + @Override + public RSet getSet(String name, Codec codec) { + return new RedissonSet(codec, commandExecutor, name); + } + + /** * Returns script with eval-operations support * * @return */ + @Override public RScript getScript() { return new RedissonScript(commandExecutor); } @@ -221,10 +251,37 @@ public class Redisson implements RedissonClient { return new RedissonSortedSet(commandExecutor, name); } + @Override + public RSortedSet getSortedSet(String name, Codec codec) { + return new RedissonSortedSet(codec, commandExecutor, name); + } + + + /** + * Returns Redis Sorted Set instance by name + * + * @param name + * @return + */ + @Override public RScoredSortedSet getScoredSortedSet(String name) { return new RedissonScoredSortedSet(commandExecutor, name); } + @Override + public RScoredSortedSet getScoredSortedSet(String name, Codec codec) { + return new RedissonScoredSortedSet(codec, commandExecutor, name); + } + + /** + * Returns String based Redis Sorted Set instance by name + * All elements are inserted with the same score during addition, + * in order to force lexicographical ordering + * + * @param name + * @return + */ + @Override public RLexSortedSet getLexSortedSet(String name) { return new RedissonLexSortedSet(commandExecutor, name); } @@ -240,6 +297,11 @@ public class Redisson implements RedissonClient { return new RedissonTopic(commandExecutor, name); } + @Override + public RTopic getTopic(String name, Codec codec) { + return new RedissonTopic(codec, commandExecutor, name); + } + /** * Returns topic instance satisfies by pattern name. * @@ -256,6 +318,12 @@ public class Redisson implements RedissonClient { return new RedissonPatternTopic(commandExecutor, pattern); } + @Override + public RPatternTopic getPatternTopic(String pattern, Codec codec) { + return new RedissonPatternTopic(codec, commandExecutor, pattern); + } + + /** * Returns distributed queue instance by name. * @@ -267,6 +335,11 @@ public class Redisson implements RedissonClient { return new RedissonQueue(commandExecutor, name); } + @Override + public RQueue getQueue(String name, Codec codec) { + return new RedissonQueue(codec, commandExecutor, name); + } + /** * Returns distributed blocking queue instance by name. * @@ -278,6 +351,11 @@ public class Redisson implements RedissonClient { return new RedissonBlockingQueue(commandExecutor, name); } + @Override + public RBlockingQueue getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueue(codec, commandExecutor, name); + } + /** * Returns distributed deque instance by name. * @@ -289,6 +367,11 @@ public class Redisson implements RedissonClient { return new RedissonDeque(commandExecutor, name); } + @Override + public RDeque getDeque(String name, Codec codec) { + return new RedissonDeque(codec, commandExecutor, name); + } + /** * Returns distributed "atomic long" instance by name. * diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java index 8e658375a..51f25f890 100644 --- a/src/main/java/org/redisson/RedissonBatch.java +++ b/src/main/java/org/redisson/RedissonBatch.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.connection.ConnectionManager; import org.redisson.core.RAtomicLongAsync; import org.redisson.core.RBatch; @@ -25,10 +26,11 @@ import org.redisson.core.RBucketAsync; import org.redisson.core.RDequeAsync; import org.redisson.core.RHyperLogLogAsync; import org.redisson.core.RKeysAsync; +import org.redisson.core.RLexSortedSetAsync; import org.redisson.core.RListAsync; import org.redisson.core.RMapAsync; import org.redisson.core.RQueueAsync; -import org.redisson.core.RScoredSortedSet; +import org.redisson.core.RScoredSortedSetAsync; import org.redisson.core.RScriptAsync; import org.redisson.core.RSetAsync; import org.redisson.core.RTopicAsync; @@ -48,56 +50,101 @@ public class RedissonBatch implements RBatch { return new RedissonBucket(executorService, name); } + public RBucketAsync getBucket(String name, Codec codec) { + return new RedissonBucket(codec, executorService, name); + } + @Override public RHyperLogLogAsync getHyperLogLog(String name) { return new RedissonHyperLogLog(executorService, name); } + public RHyperLogLogAsync getHyperLogLog(String name, Codec codec) { + return new RedissonHyperLogLog(codec, executorService, name); + } + @Override public RListAsync getList(String name) { return new RedissonList(executorService, name); } + public RListAsync getList(String name, Codec codec) { + return new RedissonList(codec, executorService, name); + } + @Override public RMapAsync getMap(String name) { return new RedissonMap(executorService, name); } + public RMapAsync getMap(String name, Codec codec) { + return new RedissonMap(codec, executorService, name); + } + @Override public RSetAsync getSet(String name) { return new RedissonSet(executorService, name); } + public RSetAsync getSet(String name, Codec codec) { + return new RedissonSet(codec, executorService, name); + } + @Override public RTopicAsync getTopic(String name) { return new RedissonTopic(executorService, name); } + public RTopicAsync getTopic(String name, Codec codec) { + return new RedissonTopic(codec, executorService, name); + } + @Override public RQueueAsync getQueue(String name) { return new RedissonQueue(executorService, name); } + public RQueueAsync getQueue(String name, Codec codec) { + return new RedissonQueue(codec, executorService, name); + } + @Override public RBlockingQueueAsync getBlockingQueue(String name) { return new RedissonBlockingQueue(executorService, name); } + public RBlockingQueueAsync getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueue(codec, executorService, name); + } + @Override public RDequeAsync getDequeAsync(String name) { return new RedissonDeque(executorService, name); } + public RDequeAsync getDequeAsync(String name, Codec codec) { + return new RedissonDeque(codec, executorService, name); + } + @Override public RAtomicLongAsync getAtomicLongAsync(String name) { return new RedissonAtomicLong(executorService, name); } @Override - public RScoredSortedSet getScoredSortedSet(String name) { + public RScoredSortedSetAsync getScoredSortedSet(String name) { return new RedissonScoredSortedSet(executorService, name); } + public RScoredSortedSetAsync getScoredSortedSet(String name, Codec codec) { + return new RedissonScoredSortedSet(codec, executorService, name); + } + + @Override + public RLexSortedSetAsync getLexSortedSet(String name) { + return new RedissonLexSortedSet(executorService, name); + } + @Override public RScriptAsync getScript() { return new RedissonScript(executorService); diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index 71945683c..bcbff591e 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.decoder.ListDrainToDecoder; @@ -43,6 +44,10 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock super(commandExecutor, name); } + protected RedissonBlockingQueue(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public Future putAsync(V e) { return offerAsync(e); @@ -60,7 +65,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public Future takeAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), 0); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0); } @Override @@ -71,7 +76,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public Future pollAsync(long timeout, TimeUnit unit) { - return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout)); } @Override @@ -88,7 +93,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public Future pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { - return commandExecutor.writeAsync(getName(), RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); } @Override @@ -113,7 +118,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock throw new NullPointerException(); } - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new ListDrainToDecoder(c)), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local vals = redis.call('lrange', KEYS[1], 0, -1); " + "redis.call('ltrim', KEYS[1], -1, 0); " + "return vals", Collections.singletonList(getName())); @@ -133,7 +138,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock if (c == null) { throw new NullPointerException(); } - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new ListDrainToDecoder(c)), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + diff --git a/src/main/java/org/redisson/RedissonBucket.java b/src/main/java/org/redisson/RedissonBucket.java index eae552d7b..798846584 100644 --- a/src/main/java/org/redisson/RedissonBucket.java +++ b/src/main/java/org/redisson/RedissonBucket.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.concurrent.TimeUnit; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RBucket; @@ -28,6 +29,10 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { super(connectionManager, name); } + protected RedissonBucket(Codec codec, CommandExecutor connectionManager, String name) { + super(codec, connectionManager, name); + } + @Override public V get() { return get(getAsync()); @@ -35,7 +40,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future getAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.GET, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName()); } @Override @@ -45,7 +50,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future setAsync(V value) { - return commandExecutor.writeAsync(getName(), RedisCommands.SET, getName(), value); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value); } @Override @@ -55,7 +60,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future setAsync(V value, long timeToLive, TimeUnit timeUnit) { - return commandExecutor.writeAsync(getName(), RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value); } @Override @@ -65,7 +70,7 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { @Override public Future existsAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.EXISTS, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.EXISTS, getName()); } } diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index c8149529a..d614143ae 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.Collection; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.core.ClusterNode; import org.redisson.core.Node; import org.redisson.core.NodesGroup; @@ -53,6 +54,8 @@ public interface RedissonClient { */ RBucket getBucket(String name); + RBucket getBucket(String name, Codec codec); + /** * Returns a list of object holder by a key pattern */ @@ -66,6 +69,8 @@ public interface RedissonClient { */ RHyperLogLog getHyperLogLog(String name); + RHyperLogLog getHyperLogLog(String name, Codec codec); + /** * Returns list instance by name. * @@ -74,6 +79,8 @@ public interface RedissonClient { */ RList getList(String name); + RList getList(String name, Codec codec); + /** * Returns map instance by name. * @@ -82,6 +89,8 @@ public interface RedissonClient { */ RMap getMap(String name); + RMap getMap(String name, Codec codec); + /** * Returns lock instance by name. * @@ -98,6 +107,8 @@ public interface RedissonClient { */ RSet getSet(String name); + RSet getSet(String name, Codec codec); + /** * Returns sorted set instance by name. * @@ -106,8 +117,26 @@ public interface RedissonClient { */ RSortedSet getSortedSet(String name); + RSortedSet getSortedSet(String name, Codec codec); + + /** + * Returns Redis Sorted Set instance by name + * + * @param name + * @return + */ RScoredSortedSet getScoredSortedSet(String name); + RScoredSortedSet getScoredSortedSet(String name, Codec codec); + + /** + * Returns String based Redis Sorted Set instance by name + * All elements are inserted with the same score during addition, + * in order to force lexicographical ordering + * + * @param name + * @return + */ RLexSortedSet getLexSortedSet(String name); /** @@ -118,6 +147,8 @@ public interface RedissonClient { */ RTopic getTopic(String name); + RTopic getTopic(String name, Codec codec); + /** * Returns topic instance satisfies by pattern name. * @@ -131,6 +162,8 @@ public interface RedissonClient { */ RPatternTopic getPatternTopic(String pattern); + RPatternTopic getPatternTopic(String pattern, Codec codec); + /** * Returns queue instance by name. * @@ -139,6 +172,8 @@ public interface RedissonClient { */ RQueue getQueue(String name); + RQueue getQueue(String name, Codec codec); + /** * Returns blocking queue instance by name. * @@ -147,6 +182,8 @@ public interface RedissonClient { */ RBlockingQueue getBlockingQueue(String name); + RBlockingQueue getBlockingQueue(String name, Codec codec); + /** * Returns deque instance by name. * @@ -155,6 +192,8 @@ public interface RedissonClient { */ RDeque getDeque(String name); + RDeque getDeque(String name, Codec codec); + /** * Returns "atomic long" instance by name. * diff --git a/src/main/java/org/redisson/RedissonDeque.java b/src/main/java/org/redisson/RedissonDeque.java index 00b1ea21e..d9a1c5823 100644 --- a/src/main/java/org/redisson/RedissonDeque.java +++ b/src/main/java/org/redisson/RedissonDeque.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.Iterator; import java.util.NoSuchElementException; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -47,6 +48,10 @@ public class RedissonDeque extends RedissonQueue implements RDeque { super(commandExecutor, name); } + public RedissonDeque(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public void addFirst(V e) { get(addFirstAsync(e)); @@ -54,7 +59,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future addFirstAsync(V e) { - return commandExecutor.writeAsync(getName(), LPUSH_VOID, getName(), e); + return commandExecutor.writeAsync(getName(), codec, LPUSH_VOID, getName(), e); } @Override @@ -64,7 +69,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future addLastAsync(V e) { - return commandExecutor.writeAsync(getName(), RPUSH_VOID, getName(), e); + return commandExecutor.writeAsync(getName(), codec, RPUSH_VOID, getName(), e); } @@ -106,7 +111,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future getLastAsync() { - return commandExecutor.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1); + return commandExecutor.readAsync(getName(), codec, LRANGE_SINGLE, getName(), -1, -1); } @Override @@ -125,7 +130,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future offerFirstAsync(V e) { - return commandExecutor.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e); + return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, getName(), e); } @Override @@ -170,7 +175,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future pollLastAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName()); } @@ -216,7 +221,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future removeLastAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName()); } @Override diff --git a/src/main/java/org/redisson/RedissonHyperLogLog.java b/src/main/java/org/redisson/RedissonHyperLogLog.java index 1c595b1f4..0fdc0e7ac 100644 --- a/src/main/java/org/redisson/RedissonHyperLogLog.java +++ b/src/main/java/org/redisson/RedissonHyperLogLog.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RHyperLogLog; @@ -31,6 +32,10 @@ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperL super(commandExecutor, name); } + protected RedissonHyperLogLog(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public boolean add(V obj) { return get(addAsync(obj)); @@ -58,7 +63,7 @@ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperL @Override public Future addAsync(V obj) { - return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), obj); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFADD, getName(), obj); } @Override @@ -66,12 +71,12 @@ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperL List args = new ArrayList(objects.size() + 1); args.add(getName()); args.addAll(objects); - return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFADD, getName(), args.toArray()); } @Override public Future countAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, getName()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFCOUNT, getName()); } @Override @@ -79,7 +84,7 @@ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperL List args = new ArrayList(otherLogNames.length + 1); args.add(getName()); args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFCOUNT, args.toArray()); } @Override @@ -87,7 +92,7 @@ public class RedissonHyperLogLog extends RedissonExpirable implements RHyperL List args = new ArrayList(otherLogNames.length + 1); args.add(getName()); args.addAll(Arrays.asList(otherLogNames)); - return commandExecutor.writeAsync(getName(), RedisCommands.PFMERGE, args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.PFMERGE, args.toArray()); } } diff --git a/src/main/java/org/redisson/RedissonLexSortedSet.java b/src/main/java/org/redisson/RedissonLexSortedSet.java index 100f1a6b8..a068ae010 100644 --- a/src/main/java/org/redisson/RedissonLexSortedSet.java +++ b/src/main/java/org/redisson/RedissonLexSortedSet.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RLexSortedSet; diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index f4cd946c1..1b34b56dc 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; @@ -58,13 +59,17 @@ public class RedissonList extends RedissonExpirable implements RList { super(commandExecutor, name); } + protected RedissonList(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public int size() { return get(sizeAsync()); } public Future sizeAsync() { - return commandExecutor.readAsync(getName(), LLEN, getName()); + return commandExecutor.readAsync(getName(), codec, LLEN, getName()); } @Override @@ -93,7 +98,7 @@ public class RedissonList extends RedissonExpirable implements RList { } private Future> readAllAsync() { - return commandExecutor.readAsync(getName(), LRANGE, getName(), 0, -1); + return commandExecutor.readAsync(getName(), codec, LRANGE, getName(), 0, -1); } @Override @@ -109,7 +114,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future addAsync(V e) { - return commandExecutor.writeAsync(getName(), RPUSH_BOOLEAN, getName(), e); + return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, getName(), e); } @Override @@ -123,7 +128,7 @@ public class RedissonList extends RedissonExpirable implements RList { } protected Future removeAsync(Object o, int count) { - return commandExecutor.writeAsync(getName(), LREM_SINGLE, getName(), count, o); + return commandExecutor.writeAsync(getName(), codec, LREM_SINGLE, getName(), count, o); } protected boolean remove(Object o, int count) { @@ -132,7 +137,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future containsAllAsync(Collection c) { - return commandExecutor.evalReadAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local items = redis.call('lrange', KEYS[1], 0, -1) " + "for i=1, #items do " + "for j = 0, table.getn(ARGV), 1 do " + @@ -166,7 +171,7 @@ public class RedissonList extends RedissonExpirable implements RList { List args = new ArrayList(c.size() + 1); args.add(getName()); args.addAll(c); - Future res = commandExecutor.writeAsync(getName(), RPUSH, args.toArray()); + Future res = commandExecutor.writeAsync(getName(), codec, RPUSH, args.toArray()); res.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -194,7 +199,7 @@ public class RedissonList extends RedissonExpirable implements RList { Collections.reverse(elements); elements.add(0, getName()); - Long newSize = commandExecutor.write(getName(), LPUSH, elements.toArray()); + Long newSize = commandExecutor.write(getName(), codec, LPUSH, elements.toArray()); return newSize != size; } @@ -203,7 +208,7 @@ public class RedissonList extends RedissonExpirable implements RList { List args = new ArrayList(coll.size() + 1); args.add(index); args.addAll(coll); - return commandExecutor.evalWrite(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), + return commandExecutor.evalWrite(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), "local ind = table.remove(ARGV, 1); " + // index is the first parameter "local tail = redis.call('lrange', KEYS[1], ind, -1); " + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + @@ -219,7 +224,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future removeAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local v = false " + "for i = 0, table.getn(ARGV), 1 do " + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 " @@ -241,7 +246,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future retainAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local changed = false " + "local items = redis.call('lrange', KEYS[1], 0, -1) " + "local i = 1 " @@ -273,7 +278,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future getAsync(int index) { - return commandExecutor.readAsync(getName(), LINDEX, getName(), index); + return commandExecutor.readAsync(getName(), codec, LINDEX, getName(), index); } @Override @@ -314,7 +319,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future setAsync(int index, V element) { - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", 5), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", 5), "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + "return v", @@ -329,7 +334,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future fastSetAsync(int index, V element) { - return commandExecutor.writeAsync(getName(), RedisCommands.LSET, getName(), index, element); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LSET, getName(), index, element); } @Override @@ -342,10 +347,10 @@ public class RedissonList extends RedissonExpirable implements RList { checkIndex(index); if (index == 0) { - return commandExecutor.write(getName(), LPOP, getName()); + return commandExecutor.write(getName(), codec, LPOP, getName()); } - return commandExecutor.evalWrite(getName(), EVAL_OBJECT, + return commandExecutor.evalWrite(getName(), codec, EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "local tail = redis.call('lrange', KEYS[1], ARGV[1]);" + "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + @@ -365,7 +370,7 @@ public class RedissonList extends RedissonExpirable implements RList { } private Future indexOfAsync(Object o, Convertor convertor) { - return commandExecutor.evalReadAsync(getName(), new RedisCommand("EVAL", convertor, 4), + return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", convertor, 4), "local key = KEYS[1] " + "local obj = ARGV[1] " + "local items = redis.call('lrange', key, 0, -1) " + @@ -390,7 +395,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future lastIndexOfAsync(Object o) { - return commandExecutor.evalReadAsync(getName(), new RedisCommand("EVAL", new IntegerReplayConvertor(), 4), + return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", new IntegerReplayConvertor(), 4), "local key = KEYS[1] " + "local obj = ARGV[1] " + "local items = redis.call('lrange', key, 0, -1) " + @@ -513,7 +518,7 @@ public class RedissonList extends RedissonExpirable implements RList { throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex); } - return commandExecutor.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1); + return commandExecutor.read(getName(), codec, LRANGE, getName(), fromIndex, toIndex - 1); } public String toString() { diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 96d9cdde8..478da9b1a 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -28,6 +28,7 @@ import java.util.NoSuchElementException; import java.util.Set; import org.redisson.client.RedisClient; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -53,12 +54,20 @@ import io.netty.util.concurrent.Future; //TODO implement watching by keys instead of map name public class RedissonMap extends RedissonExpirable implements RMap { - private final RedisCommand EVAL_PUT = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); + private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP); + private static final RedisCommand EVAL_PUT = EVAL_REPLACE; protected RedissonMap(CommandExecutor commandExecutor, String name) { super(commandExecutor, name); } + public RedissonMap(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public int size() { return get(sizeAsync()); @@ -66,7 +75,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future sizeAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.HLEN, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName()); } @Override @@ -76,12 +85,12 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public boolean containsKey(Object key) { - return commandExecutor.read(getName(), RedisCommands.HEXISTS, getName(), key); + return commandExecutor.read(getName(), codec, RedisCommands.HEXISTS, getName(), key); } @Override public Future containsKeyAsync(Object key) { - return commandExecutor.readAsync(getName(), RedisCommands.HEXISTS, getName(), key); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HEXISTS, getName(), key); } @Override @@ -91,7 +100,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future containsValueAsync(Object value) { - return commandExecutor.evalReadAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local s = redis.call('hvals', KEYS[1]);" + "for i = 0, table.getn(s), 1 do " + "if ARGV[1] == s[i] then " @@ -110,7 +119,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { List args = new ArrayList(keys.size() + 1); args.add(getName()); args.addAll(keys); - List list = commandExecutor.read(getName(), RedisCommands.HMGET, args.toArray()); + List list = commandExecutor.read(getName(), codec, RedisCommands.HMGET, args.toArray()); Map result = new HashMap(list.size()); for (int index = 0; index < args.size()-1; index++) { @@ -144,7 +153,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return; } - commandExecutor.write(getName(), RedisCommands.HMSET, getName(), map); + commandExecutor.write(getName(), codec, RedisCommands.HMSET, getName(), map); } @Override @@ -159,7 +168,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future> keySetAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.HKEYS, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HKEYS, getName()); } @Override @@ -169,12 +178,12 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future> valuesAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.HVALS, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HVALS, getName()); } @Override public Set> entrySet() { - Map map = commandExecutor.read(getName(), RedisCommands.HGETALL, getName()); + Map map = commandExecutor.read(getName(), codec, RedisCommands.HGETALL, getName()); return map.entrySet(); } @@ -185,7 +194,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future putIfAbsentAsync(K key, V value) { - return commandExecutor.evalWriteAsync(getName(), EVAL_PUT, + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end", Collections.singletonList(getName()), key, value); } @@ -197,8 +206,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future removeAsync(Object key, Object value) { - return commandExecutor.evalWriteAsync(getName(), - new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return 0 end", Collections.singletonList(getName()), key, value); } @@ -210,9 +218,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future replaceAsync(K key, V oldValue, V newValue) { - return commandExecutor.evalWriteAsync(getName(), - new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, - Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return true; else return false; end", Collections.singletonList(getName()), key, oldValue, newValue); } @@ -224,20 +230,19 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future replaceAsync(K key, V value) { - return commandExecutor.evalWriteAsync(getName(), - new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE, "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end", Collections.singletonList(getName()), key, value); } @Override public Future getAsync(K key) { - return commandExecutor.readAsync(getName(), RedisCommands.HGET, getName(), key); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HGET, getName(), key); } @Override public Future putAsync(K key, V value) { - return commandExecutor.evalWriteAsync(getName(), EVAL_PUT, + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v", Collections.singletonList(getName()), key, value); } @@ -245,15 +250,14 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future removeAsync(K key) { - return commandExecutor.evalWriteAsync(getName(), - new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v", Collections.singletonList(getName()), key); } @Override public Future fastPutAsync(K key, V value) { - return commandExecutor.writeAsync(getName(), RedisCommands.HSET, getName(), key, value); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.HSET, getName(), key, value); } @Override @@ -270,7 +274,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { List args = new ArrayList(keys.length + 1); args.add(getName()); args.addAll(Arrays.asList(keys)); - return commandExecutor.writeAsync(getName(), RedisCommands.HDEL, args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.HDEL, args.toArray()); } @Override @@ -279,7 +283,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } private MapScanResult scanIterator(RedisClient client, long startPos) { - return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos); + return commandExecutor.read(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); } private Iterator> iterator() { diff --git a/src/main/java/org/redisson/RedissonPatternTopic.java b/src/main/java/org/redisson/RedissonPatternTopic.java index ffe47b1ac..508d9299c 100644 --- a/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/src/main/java/org/redisson/RedissonPatternTopic.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.Codec; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.PatternMessageListener; import org.redisson.core.PatternStatusListener; @@ -35,10 +36,16 @@ public class RedissonPatternTopic implements RPatternTopic { final CommandExecutor commandExecutor; private final String name; + private final Codec codec; protected RedissonPatternTopic(CommandExecutor commandExecutor, String name) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + } + + protected RedissonPatternTopic(Codec codec, CommandExecutor commandExecutor, String name) { this.commandExecutor = commandExecutor; this.name = name; + this.codec = codec; } @Override @@ -53,7 +60,7 @@ public class RedissonPatternTopic implements RPatternTopic { } private int addListener(RedisPubSubListener pubSubListener) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name, codec); synchronized (entry) { if (entry.isActive()) { entry.addListener(name, pubSubListener); diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index e5da5bdcd..ea1f0b181 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.NoSuchElementException; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RQueue; @@ -35,6 +36,10 @@ public class RedissonQueue extends RedissonList implements RQueue { super(commandExecutor, name); } + protected RedissonQueue(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public boolean offer(V e) { return add(e); @@ -68,7 +73,7 @@ public class RedissonQueue extends RedissonList implements RQueue { @Override public Future pollAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.LPOP, getName()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPOP, getName()); } @Override @@ -98,7 +103,7 @@ public class RedissonQueue extends RedissonList implements RQueue { @Override public Future pollLastAndOfferFirstToAsync(String queueName) { - return commandExecutor.writeAsync(getName(), RedisCommands.RPOPLPUSH, getName(), queueName); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOPLPUSH, getName(), queueName); } @Override diff --git a/src/main/java/org/redisson/RedissonScript.java b/src/main/java/org/redisson/RedissonScript.java index 7ed84a90d..3179d482a 100644 --- a/src/main/java/org/redisson/RedissonScript.java +++ b/src/main/java/org/redisson/RedissonScript.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RScript; @@ -64,63 +65,99 @@ public class RedissonScript implements RScript { @Override public R eval(Mode mode, String luaScript, ReturnType returnType) { - return eval(null, mode, luaScript, returnType); + return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType); } - public R eval(String key, Mode mode, String luaScript, ReturnType returnType) { - return eval(key, mode, luaScript, returnType, Collections.emptyList()); + @Override + public R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType) { + return eval(null, mode, codec, luaScript, returnType); + } + + + public R eval(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType) { + return eval(key, mode, codec, luaScript, returnType, Collections.emptyList()); } @Override public R eval(Mode mode, String luaScript, ReturnType returnType, List keys, Object... values) { - return eval(null, mode, luaScript, returnType, keys, values); + return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values); + } + + @Override + public R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values) { + return eval(null, mode, codec, luaScript, returnType, keys, values); } - public R eval(String key, Mode mode, String luaScript, ReturnType returnType, List keys, Object... values) { - return (R) commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values)); + public R eval(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values) { + return (R) commandExecutor.get(evalAsync(key, mode, codec, luaScript, returnType, keys, values)); } @Override public Future evalAsync(Mode mode, String luaScript, ReturnType returnType, List keys, Object... values) { - return evalAsync(null, mode, luaScript, returnType, keys, values); + return evalAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values); } - public Future evalAsync(String key, Mode mode, String luaScript, ReturnType returnType, List keys, Object... values) { + @Override + public Future evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values) { + return evalAsync(null, mode, codec, luaScript, returnType, keys, values); + } + + public Future evalAsync(String key, Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values) { if (mode == Mode.READ_ONLY) { - return commandExecutor.evalReadAsync(key, returnType.getCommand(), luaScript, keys, values); + return commandExecutor.evalReadAsync(key, codec, returnType.getCommand(), luaScript, keys, values); } - return commandExecutor.evalWriteAsync(key, returnType.getCommand(), luaScript, keys, values); + return commandExecutor.evalWriteAsync(key, codec, returnType.getCommand(), luaScript, keys, values); } @Override public R evalSha(Mode mode, String shaDigest, ReturnType returnType) { - return evalSha(null, mode, shaDigest, returnType); + return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType); + } + + @Override + public R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType) { + return evalSha(null, mode, codec, shaDigest, returnType); } public R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType) { - return evalSha(key, mode, shaDigest, returnType, Collections.emptyList()); + return evalSha(key, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList()); + } + + public R evalSha(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType) { + return evalSha(key, mode, codec, shaDigest, returnType, Collections.emptyList()); } + @Override public R evalSha(Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values) { - return evalSha(null, mode, shaDigest, returnType, keys, values); + return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values); } - public R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values) { - return (R) commandExecutor.get(evalShaAsync(key, mode, shaDigest, returnType, keys, values)); + @Override + public R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values) { + return evalSha(null, mode, codec, shaDigest, returnType, keys, values); + } + + public R evalSha(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values) { + return (R) commandExecutor.get(evalShaAsync(key, mode, codec, shaDigest, returnType, keys, values)); } @Override public Future evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values) { - return evalShaAsync(null, mode, shaDigest, returnType, keys, values); + return evalShaAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values); + } + + @Override + public Future evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values) { + return evalShaAsync(null, mode, codec, shaDigest, returnType, keys, values); } - public Future evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values) { + public Future evalShaAsync(String key, Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values) { RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA"); if (mode == Mode.READ_ONLY) { - return commandExecutor.evalReadAsync(key, command, shaDigest, keys, values); + return commandExecutor.evalReadAsync(key, codec, command, shaDigest, keys, values); } - return commandExecutor.evalWriteAsync(key, command, shaDigest, keys, values); + return commandExecutor.evalWriteAsync(key, codec, command, shaDigest, keys, values); } @Override @@ -196,12 +233,22 @@ public class RedissonScript implements RScript { @Override public Future evalShaAsync(Mode mode, String shaDigest, ReturnType returnType) { - return evalShaAsync(null, mode, shaDigest, returnType, Collections.emptyList()); + return evalShaAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList()); + } + + @Override + public Future evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType) { + return evalShaAsync(null, mode, codec, shaDigest, returnType, Collections.emptyList()); } @Override public Future evalAsync(Mode mode, String luaScript, ReturnType returnType) { - return evalAsync(null, mode, luaScript, returnType, Collections.emptyList()); + return evalAsync(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, Collections.emptyList()); + } + + @Override + public Future evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType) { + return evalAsync(null, mode, codec, luaScript, returnType, Collections.emptyList()); } } diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index 7a38567e0..c6e20b7c0 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.NoSuchElementException; import org.redisson.client.RedisClient; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; @@ -40,10 +41,16 @@ import io.netty.util.concurrent.Future; */ public class RedissonSet extends RedissonExpirable implements RSet { + private static final RedisCommand EVAL_OBJECTS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4); + protected RedissonSet(CommandExecutor commandExecutor, String name) { super(commandExecutor, name); } + public RedissonSet(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + @Override public int size() { return get(sizeAsync()); @@ -51,7 +58,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future sizeAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.SCARD, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.SCARD, getName()); } @Override @@ -66,11 +73,11 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future containsAsync(Object o) { - return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o); + return commandExecutor.readAsync(getName(), codec, RedisCommands.SISMEMBER, getName(), o); } private ListScanResult scanIterator(RedisClient client, long startPos) { - return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos); + return commandExecutor.read(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); } @Override @@ -128,7 +135,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { } private Future> readAllAsync() { - return commandExecutor.readAsync(getName(), RedisCommands.SMEMBERS, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.SMEMBERS, getName()); } @Override @@ -150,7 +157,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future addAsync(V e) { - return commandExecutor.writeAsync(getName(), RedisCommands.SADD_SINGLE, getName(), e); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e); } @Override @@ -160,12 +167,12 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future removeRandomAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.SPOP_SINGLE, getName()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName()); } @Override public Future removeAsync(Object o) { - return commandExecutor.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), o); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SREM_SINGLE, getName(), o); } @Override @@ -180,7 +187,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future containsAllAsync(Collection c) { - return commandExecutor.evalReadAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS, "local s = redis.call('smembers', KEYS[1]);" + "for i = 0, table.getn(s), 1 do " + "for j = 0, table.getn(ARGV), 1 do " @@ -206,7 +213,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { List args = new ArrayList(c.size() + 1); args.add(getName()); args.addAll(c); - return commandExecutor.writeAsync(getName(), RedisCommands.SADD, args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD, args.toArray()); } @Override @@ -216,7 +223,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future retainAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS, "local changed = false " + "local s = redis.call('smembers', KEYS[1]) " + "local i = 0 " @@ -241,7 +248,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future removeAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS, "local v = false " + "for i = 0, table.getn(ARGV), 1 do " + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 " diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index 1087d141d..0705156bc 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -102,8 +102,16 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + commandExecutor.read(getName(), codec, new SyncOperation() { @Override public Void execute(Codec codec, RedisConnection conn) { loadComparator(conn); @@ -155,7 +163,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet() { + return commandExecutor.read(getName(), codec, new SyncOperation() { @Override public Boolean execute(Codec codec, RedisConnection conn) { return binarySearch((V)o, codec, conn).getIndex() >= 0; @@ -240,7 +248,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + commandExecutor.write(getName(), codec, new SyncOperation() { @Override public V execute(Codec codec, RedisConnection conn) { if (index == 0) { @@ -268,18 +276,18 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1); + List res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); return res.toArray(); } @Override public T[] toArray(T[] a) { - List res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1); + List res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); return res.toArray(a); } @@ -293,7 +301,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return commandExecutor.write(getName(), codec, new SyncOperation() { @Override public Boolean execute(Codec codec, RedisConnection conn) { return add(value, codec, conn); @@ -458,7 +466,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return commandExecutor.write(getName(), codec, new SyncOperation() { @Override public Boolean execute(Codec codec, RedisConnection conn) { return remove(value, codec, conn); @@ -573,7 +581,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSetasList(getName(), getComparatorKeyName()), comparatorSign); diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index a24d3ad5a..6332f3a05 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.MessageListener; @@ -38,10 +39,16 @@ public class RedissonTopic implements RTopic { final CommandExecutor commandExecutor; private final String name; + private final Codec codec; protected RedissonTopic(CommandExecutor commandExecutor, String name) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + } + + protected RedissonTopic(Codec codec, CommandExecutor commandExecutor, String name) { this.commandExecutor = commandExecutor; this.name = name; + this.codec = codec; } public List getChannelNames() { @@ -55,7 +62,7 @@ public class RedissonTopic implements RTopic { @Override public Future publishAsync(M message) { - return commandExecutor.writeAsync(name, RedisCommands.PUBLISH, name, message); + return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, message); } @Override @@ -70,7 +77,7 @@ public class RedissonTopic implements RTopic { } private int addListener(RedisPubSubListener pubSubListener) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name, codec); synchronized (entry) { if (entry.isActive()) { entry.addListener(name, pubSubListener); diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 3526fdbc3..2ccb9412c 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -135,6 +135,10 @@ public class RedisConnection implements RedisCommands { return closed; } + public void forceReconnect() { + channel.close(); + } + public ChannelFuture closeAsync() { setClosed(true); return channel.close(); diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 46f7c90af..5398693fb 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -62,7 +62,7 @@ public class CommandDecoder extends ReplayingDecoder { public static final char LF = '\n'; private static final char ZERO = '0'; - // no need concurrent map responses are coming consecutive + // It is not needed to use concurrent map because responses are coming consecutive private final Map> messageDecoders = new HashMap>(); private final Map> channels = PlatformDependent.newConcurrentHashMap(); diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 11d3d89f2..2f0588f05 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -225,9 +225,12 @@ abstract class BaseLoadBalancer implements LoadBalancer { public void returnConnection(RedisConnection connection) { SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); - if (entry.isFreezed() || connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) { + if (entry.isFreezed()) { connection.closeAsync(); } else { + if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) { + connection.forceReconnect(); + } entry.getConnections().add(connection); } entry.getConnectionsSemaphore().release(); diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index a1fe7edb3..557ccc318 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -244,7 +244,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { c.setPassword(cfg.getPassword()); c.setDatabase(cfg.getDatabase()); c.setClientName(cfg.getClientName()); - c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); + c.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index f7bcf5d9e..c3ecea282 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -79,15 +79,15 @@ public interface ConnectionManager { PubSubConnectionEntry getEntry(String channelName); - PubSubConnectionEntry subscribe(String channelName); + PubSubConnectionEntry subscribe(String channelName, Codec codec); - PubSubConnectionEntry psubscribe(String pattern); + PubSubConnectionEntry psubscribe(String pattern, Codec codec); void subscribe(RedisPubSubListener listener, String channelName); - void unsubscribe(String channelName); + Codec unsubscribe(String channelName); - void punsubscribe(String channelName); + Codec punsubscribe(String channelName); void shutdown(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b64de1e78..eb6676f49 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -221,7 +221,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public PubSubConnectionEntry subscribe(String channelName) { + public PubSubConnectionEntry subscribe(String channelName, Codec codec) { // multiple channel names per PubSubConnections allowed PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { @@ -240,7 +240,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return subscribe(channelName); + return subscribe(channelName, codec); } entry.subscribe(codec, channelName); return entry; @@ -262,7 +262,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return subscribe(channelName); + return subscribe(channelName, codec); } entry.subscribe(codec, channelName); return entry; @@ -270,7 +270,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public PubSubConnectionEntry psubscribe(String channelName) { + public PubSubConnectionEntry psubscribe(String channelName, Codec codec) { // multiple channel names per PubSubConnections allowed PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { @@ -289,7 +289,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return psubscribe(channelName); + return psubscribe(channelName, codec); } entry.psubscribe(codec, channelName); return entry; @@ -311,7 +311,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return psubscribe(channelName); + return psubscribe(channelName, codec); } entry.psubscribe(codec, channelName); return entry; @@ -368,13 +368,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public void unsubscribe(final String channelName) { + public Codec unsubscribe(final String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { - return; + return null; } - entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + return entry.unsubscribe(channelName, new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String channel) { @@ -393,13 +393,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public void punsubscribe(final String channelName) { + public Codec punsubscribe(final String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { - return; + return null; } - entry.punsubscribe(channelName, new BaseRedisPubSubListener() { + return entry.punsubscribe(channelName, new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String channel) { @@ -438,9 +438,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.close(); Collection listeners = entry.getListeners(channelName); - unsubscribe(channelName); + Codec subscribeCodec = unsubscribe(channelName); if (!listeners.isEmpty()) { - PubSubConnectionEntry newEntry = subscribe(channelName); + PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec); for (RedisPubSubListener redisPubSubListener : listeners) { newEntry.addListener(channelName, redisPubSubListener); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 097cbe20f..988247d1c 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -175,10 +175,8 @@ public class MasterSlaveEntry { if (!entry.getClient().equals(connection.getRedisClient())) { connection.closeAsync(); return; - } else if (connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) { - connection.closeAsync(); - entry.getConnectionsSemaphore().release(); - return; + } else if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) { + connection.forceReconnect(); } entry.getConnections().add(connection); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 59515c435..14f84e933 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -18,6 +18,7 @@ package org.redisson.connection; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -32,6 +33,8 @@ import org.redisson.client.protocol.pubsub.PubSubType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.internal.PlatformDependent; + public class PubSubConnectionEntry { public enum Status {ACTIVE, INACTIVE} @@ -42,6 +45,8 @@ public class PubSubConnectionEntry { private final Semaphore subscribedChannelsAmount; private final RedisPubSubConnection conn; private final int subscriptionsPerConnection; + + private final Map channel2Codec = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap> channelListeners = new ConcurrentHashMap>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { @@ -121,11 +126,13 @@ public class PubSubConnectionEntry { subscribedChannelsAmount.release(); } - public void subscribe(Codec codec, final String channelName) { + public void subscribe(Codec codec, String channelName) { + channel2Codec.put(channelName, codec); conn.subscribe(codec, channelName); } - public void psubscribe(Codec codec, final String pattern) { + public void psubscribe(Codec codec, String pattern) { + channel2Codec.put(pattern, codec); conn.psubscribe(codec, pattern); } @@ -134,7 +141,7 @@ public class PubSubConnectionEntry { conn.subscribe(codec, channel); } - public void unsubscribe(final String channel, RedisPubSubListener listener) { + public Codec unsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { @@ -148,6 +155,7 @@ public class PubSubConnectionEntry { }); conn.addOneShotListener(listener); conn.unsubscribe(channel); + return channel2Codec.remove(channel); } private void removeListeners(String channel) { @@ -163,7 +171,7 @@ public class PubSubConnectionEntry { subscribedChannelsAmount.release(); } - public void punsubscribe(final String channel, RedisPubSubListener listener) { + public Codec punsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { @@ -176,6 +184,7 @@ public class PubSubConnectionEntry { }); conn.addOneShotListener(listener); conn.punsubscribe(channel); + return channel2Codec.remove(channel); } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 7a2d8d76b..746b22a5c 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -61,7 +61,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setPassword(cfg.getPassword()); c.setDatabase(cfg.getDatabase()); c.setClientName(cfg.getClientName()); - c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); + c.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 18baafdf5..2b5a18de0 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -48,7 +48,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setPassword(cfg.getPassword()); newconfig.setDatabase(cfg.getDatabase()); newconfig.setClientName(cfg.getClientName()); - newconfig.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); + newconfig.setRefreshConnectionAfterFails(cfg.getRefreshConnectionAfterFails()); newconfig.setMasterAddress(addr); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); diff --git a/src/main/java/org/redisson/core/RBatch.java b/src/main/java/org/redisson/core/RBatch.java index eccd1088b..8a0c319b7 100644 --- a/src/main/java/org/redisson/core/RBatch.java +++ b/src/main/java/org/redisson/core/RBatch.java @@ -17,6 +17,8 @@ package org.redisson.core; import java.util.List; +import org.redisson.client.codec.Codec; + import io.netty.util.concurrent.Future; /** @@ -41,6 +43,8 @@ public interface RBatch { */ RBucketAsync getBucket(String name); + RBucketAsync getBucket(String name, Codec codec); + /** * Returns HyperLogLog object * @@ -49,6 +53,8 @@ public interface RBatch { */ RHyperLogLogAsync getHyperLogLog(String name); + RHyperLogLogAsync getHyperLogLog(String name, Codec codec); + /** * Returns list instance by name. * @@ -57,6 +63,8 @@ public interface RBatch { */ RListAsync getList(String name); + RListAsync getList(String name, Codec codec); + /** * Returns map instance by name. * @@ -65,6 +73,8 @@ public interface RBatch { */ RMapAsync getMap(String name); + RMapAsync getMap(String name, Codec codec); + /** * Returns set instance by name. * @@ -73,6 +83,8 @@ public interface RBatch { */ RSetAsync getSet(String name); + RSetAsync getSet(String name, Codec codec); + /** * Returns topic instance by name. * @@ -81,6 +93,8 @@ public interface RBatch { */ RTopicAsync getTopic(String name); + RTopicAsync getTopic(String name, Codec codec); + /** * Returns queue instance by name. * @@ -89,6 +103,8 @@ public interface RBatch { */ RQueueAsync getQueue(String name); + RQueueAsync getQueue(String name, Codec codec); + /** * Returns blocking queue instance by name. * @@ -97,6 +113,8 @@ public interface RBatch { */ RBlockingQueueAsync getBlockingQueue(String name); + RBlockingQueueAsync getBlockingQueue(String name, Codec codec); + /** * Returns deque instance by name. * @@ -105,6 +123,8 @@ public interface RBatch { */ RDequeAsync getDequeAsync(String name); + RDequeAsync getDequeAsync(String name, Codec codec); + /** * Returns "atomic long" instance by name. * @@ -113,7 +133,25 @@ public interface RBatch { */ RAtomicLongAsync getAtomicLongAsync(String name); - RScoredSortedSet getScoredSortedSet(String name); + /** + * Returns Redis Sorted Set instance by name + * + * @param name + * @return + */ + RScoredSortedSetAsync getScoredSortedSet(String name); + + RScoredSortedSetAsync getScoredSortedSet(String name, Codec codec); + + /** + * Returns String based Redis Sorted Set instance by name + * All elements are inserted with the same score during addition, + * in order to force lexicographical ordering + * + * @param name + * @return + */ + RLexSortedSetAsync getLexSortedSet(String name); /** * Returns script operations object diff --git a/src/main/java/org/redisson/core/RScript.java b/src/main/java/org/redisson/core/RScript.java index 09eee87d1..de5598fcd 100644 --- a/src/main/java/org/redisson/core/RScript.java +++ b/src/main/java/org/redisson/core/RScript.java @@ -17,6 +17,7 @@ package org.redisson.core; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -47,12 +48,20 @@ public interface RScript extends RScriptAsync { R evalSha(Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values); + R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values); + R evalSha(Mode mode, String shaDigest, ReturnType returnType); + R evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType); + R eval(Mode mode, String luaScript, ReturnType returnType, List keys, Object... values); + R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values); + R eval(Mode mode, String luaScript, ReturnType returnType); + R eval(Mode mode, Codec codec, String luaScript, ReturnType returnType); + String scriptLoad(String luaScript); List scriptExists(String ... shaDigests); diff --git a/src/main/java/org/redisson/core/RScriptAsync.java b/src/main/java/org/redisson/core/RScriptAsync.java index 8b6244866..7e46aaefe 100644 --- a/src/main/java/org/redisson/core/RScriptAsync.java +++ b/src/main/java/org/redisson/core/RScriptAsync.java @@ -17,6 +17,7 @@ package org.redisson.core; import java.util.List; +import org.redisson.client.codec.Codec; import org.redisson.core.RScript.Mode; import org.redisson.core.RScript.ReturnType; @@ -28,12 +29,20 @@ public interface RScriptAsync { Future evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values); + Future evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values); + Future evalShaAsync(Mode mode, String shaDigest, ReturnType returnType); + Future evalShaAsync(Mode mode, Codec codec, String shaDigest, ReturnType returnType); + Future evalAsync(Mode mode, String luaScript, ReturnType returnType, List keys, Object... values); + Future evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values); + Future evalAsync(Mode mode, String luaScript, ReturnType returnType); + Future evalAsync(Mode mode, Codec codec, String luaScript, ReturnType returnType); + Future scriptLoadAsync(String luaScript); Future> scriptExistsAsync(String ... shaDigests); diff --git a/src/test/java/org/redisson/RedissonBatchTest.java b/src/test/java/org/redisson/RedissonBatchTest.java index c945fbd44..2413caf90 100644 --- a/src/test/java/org/redisson/RedissonBatchTest.java +++ b/src/test/java/org/redisson/RedissonBatchTest.java @@ -6,11 +6,27 @@ import java.util.Map; import org.junit.Assert; import org.junit.Test; +import org.redisson.client.codec.StringCodec; import org.redisson.core.RBatch; import org.redisson.core.RListAsync; +import io.netty.util.concurrent.Future; + public class RedissonBatchTest extends BaseTest { + @Test + public void testDifferentCodecs() { + RBatch b = redisson.createBatch(); + b.getMap("test1").putAsync("1", "2"); + b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3"); + Future val1 = b.getMap("test1").getAsync("1"); + Future val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21"); + b.execute(); + + Assert.assertEquals("2", val1.getNow()); + Assert.assertEquals("3", val2.getNow()); + } + @Test public void testBatchList() { RBatch b = redisson.createBatch();