diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 941de1852..4c270f4ff 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -42,8 +42,8 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch { - private static final Integer zeroCountMessage = 0; - private static final Integer newCountMessage = 1; + private static final Long zeroCountMessage = 0L; + private static final Long newCountMessage = 1L; private static final ConcurrentMap ENTRIES = PlatformDependent.newConcurrentHashMap(); @@ -72,18 +72,17 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return oldValue.getPromise(); } - RedisPubSubListener listener = createListener(value); - - commandExecutor.getConnectionManager().subscribe(listener, getChannelName()); + RedisPubSubListener listener = createListener(value); + commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener); return newPromise; } } - private RedisPubSubListener createListener(final RedissonCountDownLatchEntry value) { - RedisPubSubListener listener = new BaseRedisPubSubListener() { + private RedisPubSubListener createListener(final RedissonCountDownLatchEntry value) { + RedisPubSubListener listener = new BaseRedisPubSubListener() { @Override - public void onMessage(String channel, Integer message) { + public void onMessage(String channel, Long message) { if (!getChannelName().equals(channel)) { return; } @@ -175,7 +174,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return; } - Future f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, + Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" + @@ -208,7 +207,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public boolean trySetCount(long count) { - Future f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, + Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('publish', ARGV[3], ARGV[1]); " @@ -222,8 +221,10 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public Future deleteAsync() { - return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, - "if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end", + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, + "if redis.call('del', KEYS[1]) == 1 then " + + "redis.call('publish', ARGV[2], ARGV[1]); " + + "return true else return false end", Collections.singletonList(getName()), newCountMessage, getChannelName()); } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 743bbcb7d..f3afb1ba6 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -25,11 +25,14 @@ import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.codec.JsonJacksonCodec; +import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.RLock; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; @@ -116,7 +119,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { }; - commandExecutor.getConnectionManager().subscribe(listener, getChannelName()); + commandExecutor.getConnectionManager().subscribe(commandExecutor.getConnectionManager().getCodec(), getChannelName(), listener); return newPromise; } } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 6187e7075..721db406c 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -77,17 +77,9 @@ public class RedissonTopic implements RTopic { } private int addListener(RedisPubSubListener pubSubListener) { - Future future = commandExecutor.getConnectionManager().subscribe(name, codec); + Future future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); future.syncUninterruptibly(); - PubSubConnectionEntry entry = future.getNow(); - synchronized (entry) { - if (entry.isActive()) { - entry.addListener(name, pubSubListener); - return pubSubListener.hashCode(); - } - } - // entry is inactive trying add again - return addListener(pubSubListener); + return pubSubListener.hashCode(); } @Override @@ -106,7 +98,7 @@ public class RedissonTopic implements RTopic { } } - // entry is inactive trying add again + // listener has been re-attached removeListener(listenerId); } diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index e12af0f62..f9681558d 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -32,6 +32,9 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; public class RedisPubSubConnection extends RedisConnection { @@ -102,8 +105,8 @@ public class RedisPubSubConnection extends RedisConnection { } } - private void async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { - channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); + private ChannelFuture async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { + return channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); } public Map getChannels() { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 0a563db01..7278358d0 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -110,7 +110,7 @@ public interface RedisCommands { RedisCommand LSET = new RedisCommand("LSET", new VoidReplayConvertor(), 3); RedisCommand LPOP = new RedisCommand("LPOP"); RedisCommand LREM_SINGLE = new RedisCommand("LREM", new BooleanReplayConvertor(), 3); - RedisCommand LREM = new RedisCommand("LREM", 3); + RedisStrictCommand LREM = new RedisStrictCommand("LREM", 3); RedisCommand LINDEX = new RedisCommand("LINDEX"); RedisCommand LINSERT = new RedisCommand("LINSERT", 3, ValueType.OBJECTS); RedisStrictCommand LLEN_INT = new RedisStrictCommand("LLEN", new IntegerReplayConvertor()); @@ -128,11 +128,11 @@ public interface RedisCommands { RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor(), 2); - RedisCommand PFCOUNT = new RedisCommand("PFCOUNT"); + RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); RedisStrictCommand PFMERGE = new RedisStrictCommand("PFMERGE", new VoidReplayConvertor()); - RedisCommand RPOP = new RedisCommand("RPOP"); - RedisCommand LPUSH = new RedisCommand("LPUSH", 2); + RedisStrictCommand RPOP = new RedisStrictCommand("RPOP"); + RedisStrictCommand LPUSH = new RedisStrictCommand("LPUSH", 2); RedisCommand> LRANGE = new RedisCommand>("LRANGE", new ObjectListReplayDecoder()); RedisCommand RPUSH = new RedisCommand("RPUSH", 2, ValueType.OBJECTS); RedisCommand RPUSH_BOOLEAN = new RedisCommand("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS); @@ -194,7 +194,7 @@ public interface RedisCommands { RedisStrictCommand MOVE = new RedisStrictCommand("MOVE", new BooleanReplayConvertor()); RedisStrictCommand MIGRATE = new RedisStrictCommand("MIGRATE", new VoidReplayConvertor()); - RedisCommand PUBLISH = new RedisCommand("PUBLISH", 2); + RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH", 2); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index d2d45d8e7..aa341c930 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -18,6 +18,7 @@ package org.redisson.codec; import java.io.IOException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -44,11 +45,13 @@ import io.netty.buffer.ByteBufInputStream; */ public class JsonJacksonCodec implements Codec { - private final ObjectMapper mapObjectMapper = initObjectMapper(); + public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); - protected ObjectMapper initObjectMapper() { - return new ObjectMapper(); - } + private final ObjectMapper mapObjectMapper = initObjectMapper(); + + protected ObjectMapper initObjectMapper() { + return new ObjectMapper(); + } private final Encoder encoder = new Encoder() { @Override diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 4cf620aed..691d3bbe4 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise; */ public interface ConnectionManager { + Promise subscribe(Codec codec, String channelName, RedisPubSubListener listener); + ConnectionListener getConnectListener(); IdleConnectionWatcher getConnectionWatcher(); @@ -89,12 +91,8 @@ public interface ConnectionManager { PubSubConnectionEntry getPubSubEntry(String channelName); - Future subscribe(String channelName, Codec codec); - Future psubscribe(String pattern, Codec codec); - void subscribe(RedisPubSubListener listener, String channelName); - Codec unsubscribe(String channelName); Codec punsubscribe(String channelName); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index f9774f90e..103c4bd4e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -270,14 +270,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return name2PubSubConnection.get(channelName); } - public Future subscribe(String channelName, Codec codec) { + @Override + public Future psubscribe(final String channelName, final Codec codec) { Promise promise = group.next().newPromise(); - subscribe(channelName, codec, promise); + psubscribe(channelName, codec, promise); return promise; } - private void subscribe(final String channelName, final Codec codec, final Promise promise) { - // multiple channel names per PubSubConnections allowed + private void psubscribe(final String channelName, final Codec codec, final Promise promise) { + // multiple channel names per PubSubConnections are allowed PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { promise.setSuccess(сonnEntry); @@ -297,10 +298,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - subscribe(channelName, codec, promise); + psubscribe(channelName, codec, promise); return; } - entry.subscribe(codec, channelName); + entry.psubscribe(codec, channelName); promise.setSuccess(entry); return; } @@ -331,28 +332,33 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - subscribe(channelName, codec, promise); + psubscribe(channelName, codec, promise); return; } - entry.subscribe(codec, channelName); + entry.psubscribe(codec, channelName); promise.setSuccess(entry); } } }); } - @Override - public Future psubscribe(final String channelName, final Codec codec) { - Promise promise = group.next().newPromise(); - psubscribe(channelName, codec, promise); + public Promise subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) { + Promise promise = newPromise(); + subscribe(codec, channelName, listener, promise); return promise; } - private void psubscribe(final String channelName, final Codec codec, final Promise promise) { - // multiple channel names per PubSubConnections are allowed + private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { - promise.setSuccess(сonnEntry); + synchronized (сonnEntry) { + if (сonnEntry.isActive()) { + сonnEntry.addListener(channelName, listener); + promise.setSuccess(сonnEntry); + return; + } + } + connect(codec, channelName, listener, promise); return; } @@ -362,26 +368,38 @@ public class MasterSlaveConnectionManager implements ConnectionManager { PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { entry.release(); - promise.setSuccess(oldEntry); + synchronized (oldEntry) { + if (oldEntry.isActive()) { + oldEntry.addListener(channelName, listener); + promise.setSuccess(oldEntry); + return; + } + } + subscribe(codec, channelName, listener, promise); return; } - synchronized (entry) { if (!entry.isActive()) { entry.release(); - psubscribe(channelName, codec, promise); + subscribe(codec, channelName, listener, promise); return; } - entry.psubscribe(codec, channelName); + entry.subscribe(codec, listener, channelName); promise.setSuccess(entry); return; } } } + connect(codec, channelName, listener, promise); + } + + private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, + final Promise promise) { final int slot = 0; Future connFuture = nextPubSubConnection(slot); connFuture.addListener(new FutureListener() { + @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { @@ -390,78 +408,34 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } RedisPubSubConnection conn = future.getNow(); - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); - promise.setSuccess(oldEntry); - return; - } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - psubscribe(channelName, codec, promise); - return; + synchronized (oldEntry) { + if (oldEntry.isActive()) { + oldEntry.addListener(channelName, listener); + promise.setSuccess(oldEntry); + return; + } } - entry.psubscribe(codec, channelName); - promise.setSuccess(entry); - } - } - }); - } - - @Override - public void subscribe(final RedisPubSubListener listener, final String channelName) { - PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); - if (сonnEntry != null) { - сonnEntry.subscribe(codec, listener, channelName); - return; - } - - Set entries = new HashSet(name2PubSubConnection.values()); - for (PubSubConnectionEntry entry : entries) { - if (entry.tryAcquire()) { - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - entry.release(); + subscribe(codec, channelName, listener, promise); return; } synchronized (entry) { if (!entry.isActive()) { entry.release(); - subscribe(listener, channelName); + subscribe(codec, channelName, listener, promise); return; } entry.subscribe(codec, listener, channelName); + promise.setSuccess(entry); return; } } - } - - final int slot = 0; - Future connFuture = nextPubSubConnection(slot); - connFuture.syncUninterruptibly(); - RedisPubSubConnection conn = connFuture.getNow(); - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - return; - } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - subscribe(listener, channelName); - return; - } - entry.subscribe(codec, listener, channelName); - return; - } - + }); } @Override @@ -570,6 +544,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe topic channel: " + channelName); + return; + } + PubSubConnectionEntry newEntry = future.getNow(); for (RedisPubSubListener redisPubSubListener : listeners) { newEntry.addListener(channelName, redisPubSubListener); @@ -581,12 +560,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } else { Codec subscribeCodec = unsubscribe(channelName); if (!listeners.isEmpty()) { - Future future = subscribe(channelName, subscribeCodec); + Future future = subscribe(subscribeCodec, channelName, null); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe topic channel: " + channelName); + return; + } PubSubConnectionEntry newEntry = future.getNow(); for (RedisPubSubListener redisPubSubListener : listeners) { newEntry.addListener(channelName, redisPubSubListener);