Publish/subscribe refactoring

pull/337/head
Nikita 9 years ago
parent b02d901fa3
commit 316ee3268a

@ -42,8 +42,8 @@ import io.netty.util.internal.PlatformDependent;
*/
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
private static final Integer zeroCountMessage = 0;
private static final Integer newCountMessage = 1;
private static final Long zeroCountMessage = 0L;
private static final Long newCountMessage = 1L;
private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();
@ -72,18 +72,17 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return oldValue.getPromise();
}
RedisPubSubListener<Integer> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
RedisPubSubListener<Long> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise;
}
}
private RedisPubSubListener<Integer> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
private RedisPubSubListener<Long> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
@Override
public void onMessage(String channel, Integer message) {
public void onMessage(String channel, Long message) {
if (!getChannelName().equals(channel)) {
return;
}
@ -175,7 +174,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return;
}
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
@ -208,7 +207,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean trySetCount(long count) {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', ARGV[3], ARGV[1]); "
@ -222,8 +221,10 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end",
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', ARGV[2], ARGV[1]); "
+ "return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
}

@ -25,11 +25,14 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RLock;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
@ -116,7 +119,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
};
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
commandExecutor.getConnectionManager().subscribe(commandExecutor.getConnectionManager().getCodec(), getChannelName(), listener);
return newPromise;
}
}

@ -77,17 +77,9 @@ public class RedissonTopic<M> implements RTopic<M> {
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(name, codec);
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return pubSubListener.hashCode();
}
}
// entry is inactive trying add again
return addListener(pubSubListener);
return pubSubListener.hashCode();
}
@Override
@ -106,7 +98,7 @@ public class RedissonTopic<M> implements RTopic<M> {
}
}
// entry is inactive trying add again
// listener has been re-attached
removeListener(listenerId);
}

@ -32,6 +32,9 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
public class RedisPubSubConnection extends RedisConnection {
@ -102,8 +105,8 @@ public class RedisPubSubConnection extends RedisConnection {
}
}
private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
return channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
}
public Map<String, Codec> getChannels() {

@ -110,7 +110,7 @@ public interface RedisCommands {
RedisCommand<Void> LSET = new RedisCommand<Void>("LSET", new VoidReplayConvertor(), 3);
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");
RedisCommand<Boolean> LREM_SINGLE = new RedisCommand<Boolean>("LREM", new BooleanReplayConvertor(), 3);
RedisCommand<Long> LREM = new RedisCommand<Long>("LREM", 3);
RedisStrictCommand<Long> LREM = new RedisStrictCommand<Long>("LREM", 3);
RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX");
RedisCommand<Object> LINSERT = new RedisCommand<Object>("LINSERT", 3, ValueType.OBJECTS);
RedisStrictCommand<Integer> LLEN_INT = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
@ -128,11 +128,11 @@ public interface RedisCommands {
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2);
RedisCommand<Long> PFCOUNT = new RedisCommand<Long>("PFCOUNT");
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
RedisStrictCommand<Void> PFMERGE = new RedisStrictCommand<Void>("PFMERGE", new VoidReplayConvertor());
RedisCommand<Long> RPOP = new RedisCommand<Long>("RPOP");
RedisCommand<Long> LPUSH = new RedisCommand<Long>("LPUSH", 2);
RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP");
RedisStrictCommand<Long> LPUSH = new RedisStrictCommand<Long>("LPUSH", 2);
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<Long> RPUSH = new RedisCommand<Long>("RPUSH", 2, ValueType.OBJECTS);
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS);
@ -194,7 +194,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> MOVE = new RedisStrictCommand<Boolean>("MOVE", new BooleanReplayConvertor());
RedisStrictCommand<Void> MIGRATE = new RedisStrictCommand<Void>("MIGRATE", new VoidReplayConvertor());
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 2);
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH", 2);
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());

@ -18,6 +18,7 @@ package org.redisson.codec;
import java.io.IOException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
@ -44,11 +45,13 @@ import io.netty.buffer.ByteBufInputStream;
*/
public class JsonJacksonCodec implements Codec {
private final ObjectMapper mapObjectMapper = initObjectMapper();
public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec();
protected ObjectMapper initObjectMapper() {
return new ObjectMapper();
}
private final ObjectMapper mapObjectMapper = initObjectMapper();
protected ObjectMapper initObjectMapper() {
return new ObjectMapper();
}
private final Encoder encoder = new Encoder() {
@Override

@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise;
*/
public interface ConnectionManager {
Promise<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener listener);
ConnectionListener getConnectListener();
IdleConnectionWatcher getConnectionWatcher();
@ -89,12 +91,8 @@ public interface ConnectionManager {
PubSubConnectionEntry getPubSubEntry(String channelName);
Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);
<V> void subscribe(RedisPubSubListener<V> listener, String channelName);
Codec unsubscribe(String channelName);
Codec punsubscribe(String channelName);

@ -270,14 +270,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return name2PubSubConnection.get(channelName);
}
public Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec) {
@Override
public Future<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec) {
Promise<PubSubConnectionEntry> promise = group.next().newPromise();
subscribe(channelName, codec, promise);
psubscribe(channelName, codec, promise);
return promise;
}
private void subscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections allowed
private void psubscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections are allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
promise.setSuccess(сonnEntry);
@ -297,10 +298,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(channelName, codec, promise);
psubscribe(channelName, codec, promise);
return;
}
entry.subscribe(codec, channelName);
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
return;
}
@ -331,28 +332,33 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(channelName, codec, promise);
psubscribe(channelName, codec, promise);
return;
}
entry.subscribe(codec, channelName);
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
}
}
});
}
@Override
public Future<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec) {
Promise<PubSubConnectionEntry> promise = group.next().newPromise();
psubscribe(channelName, codec, promise);
public Promise<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise);
return promise;
}
private void psubscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections are allowed
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
promise.setSuccess(сonnEntry);
synchronized (сonnEntry) {
if (сonnEntry.isActive()) {
сonnEntry.addListener(channelName, listener);
promise.setSuccess(сonnEntry);
return;
}
}
connect(codec, channelName, listener, promise);
return;
}
@ -362,26 +368,38 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
promise.setSuccess(oldEntry);
synchronized (oldEntry) {
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
promise.setSuccess(oldEntry);
return;
}
}
subscribe(codec, channelName, listener, promise);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
psubscribe(channelName, codec, promise);
subscribe(codec, channelName, listener, promise);
return;
}
entry.psubscribe(codec, channelName);
entry.subscribe(codec, listener, channelName);
promise.setSuccess(entry);
return;
}
}
}
connect(codec, channelName, listener, promise);
}
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,
final Promise<PubSubConnectionEntry> promise) {
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
@ -390,78 +408,34 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
psubscribe(channelName, codec, promise);
return;
synchronized (oldEntry) {
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
promise.setSuccess(oldEntry);
return;
}
}
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
}
}
});
}
@Override
public void subscribe(final RedisPubSubListener listener, final String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.subscribe(codec, listener, channelName);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
subscribe(codec, channelName, listener, promise);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
subscribe(codec, channelName, listener, promise);
return;
}
entry.subscribe(codec, listener, channelName);
promise.setSuccess(entry);
return;
}
}
}
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.syncUninterruptibly();
RedisPubSubConnection conn = connFuture.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
});
}
@Override
@ -570,6 +544,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
@ -581,12 +560,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} else {
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = subscribe(channelName, subscribeCodec);
Future<PubSubConnectionEntry> future = subscribe(subscribeCodec, channelName, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);

Loading…
Cancel
Save