Config.keepPubSubOrder setting added. #919

pull/709/merge
Nikita 8 years ago
parent 8cad23cdce
commit e56e0910a9

@ -48,6 +48,7 @@ public class RedisClientConfig {
private int database; private int database;
private String clientName; private String clientName;
private boolean readOnly; private boolean readOnly;
private boolean keepPubSubOrder = true;
private boolean sslEnableEndpointIdentification = true; private boolean sslEnableEndpointIdentification = true;
private SslProvider sslProvider = SslProvider.JDK; private SslProvider sslProvider = SslProvider.JDK;
@ -56,6 +57,7 @@ public class RedisClientConfig {
private URI sslKeystore; private URI sslKeystore;
private String sslKeystorePassword; private String sslKeystorePassword;
public RedisClientConfig setAddress(String host, int port) { public RedisClientConfig setAddress(String host, int port) {
this.address = URIBuilder.create("redis://" + host + ":" + port); this.address = URIBuilder.create("redis://" + host + ":" + port);
return this; return this;
@ -201,6 +203,11 @@ public class RedisClientConfig {
return this; return this;
} }
public boolean isKeepPubSubOrder() {
return keepPubSubOrder;
}
public void setKeepPubSubOrder(boolean keepPubSubOrder) {
this.keepPubSubOrder = keepPubSubOrder;
}
} }

@ -44,18 +44,20 @@ import io.netty.util.internal.PlatformDependent;
public class CommandPubSubDecoder extends CommandDecoder { public class CommandPubSubDecoder extends CommandDecoder {
// It is not needed to use concurrent map because responses are coming consecutive // It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, MultiDecoder<Object>> pubSubMessageDecoders = new HashMap<String, MultiDecoder<Object>>(); private final Map<String, PubSubEntry> entries = new HashMap<String, PubSubEntry>();
private final Map<PubSubKey, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap(); private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap();
private final ExecutorService executor; private final ExecutorService executor;
private final boolean keepOrder;
public CommandPubSubDecoder(ExecutorService executor) { public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder) {
this.executor = executor; this.executor = executor;
this.keepOrder = keepOrder;
} }
public void addPubSubCommand(String channel, CommandData<Object, Object> data) { public void addPubSubCommand(String channel, CommandData<Object, Object> data) {
String operation = data.getCommand().getName().toLowerCase(); String operation = data.getCommand().getName().toLowerCase();
pubSubChannels.put(new PubSubKey(channel, operation), data); commands.put(new PubSubKey(channel, operation), data);
} }
@Override @Override
@ -70,27 +72,68 @@ public class CommandPubSubDecoder extends CommandDecoder {
String channelName = ((PubSubStatusMessage) result).getChannel(); String channelName = ((PubSubStatusMessage) result).getChannel();
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation); PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key); CommandData<Object, Object> d = commands.get(key);
if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) { if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key); commands.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder()); entries.put(channelName, new PubSubEntry(d.getMessageDecoder()));
} }
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) { if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key); commands.remove(key);
pubSubMessageDecoders.remove(channelName); entries.remove(key);
} }
} }
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
if (keepOrder) {
PubSubEntry item = entries.get(((Message) result).getChannel());
enqueueMessage(result, pubSubConnection, item);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
});
}
}
}
private void enqueueMessage(Object result, final RedisPubSubConnection pubSubConnection, final PubSubEntry entry) {
if (result != null) {
entry.getQueue().add((Message)result);
}
if (entry.getSent().compareAndSet(false, true)) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
if (result instanceof PubSubStatusMessage) { try {
pubSubConnection.onMessage((PubSubStatusMessage) result); while (true) {
} else if (result instanceof PubSubMessage) { Message result = entry.getQueue().poll();
pubSubConnection.onMessage((PubSubMessage) result); if (result != null) {
} else { if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubPatternMessage) result); pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
} else {
break;
}
}
} finally {
entry.getSent().set(false);
if (!entry.getQueue().isEmpty()) {
enqueueMessage(null, pubSubConnection, entry);
}
} }
} }
}); });
@ -107,17 +150,17 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) { if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
String channelName = parts.get(1).toString(); String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command); PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = pubSubChannels.get(key); CommandData<Object, Object> commandData = commands.get(key);
if (commandData == null) { if (commandData == null) {
return null; return null;
} }
return commandData.getCommand().getReplayMultiDecoder(); return commandData.getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) { } else if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1); String channelName = (String) parts.get(1);
return pubSubMessageDecoders.get(channelName); return entries.get(channelName).getDecoder();
} else if (parts.get(0).equals("pmessage")) { } else if (parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1); String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName); return entries.get(patternName).getDecoder();
} }
} }
@ -129,11 +172,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (data == null && parts != null) { if (data == null && parts != null) {
if (parts.size() == 2 && "message".equals(parts.get(0))) { if (parts.size() == 2 && "message".equals(parts.get(0))) {
String channelName = (String) parts.get(1); String channelName = (String) parts.get(1);
return pubSubMessageDecoders.get(channelName); return entries.get(channelName).getDecoder();
} }
if (parts.size() == 3 && "pmessage".equals(parts.get(0))) { if (parts.size() == 3 && "pmessage".equals(parts.get(0))) {
String patternName = (String) parts.get(1); String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName); return entries.get(patternName).getDecoder();
} }
} }
return super.selectDecoder(data, parts); return super.selectDecoder(data, parts);

@ -0,0 +1,55 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
/**
*
* @author Nikita Koksharov
*
*/
public class PubSubEntry {
private final MultiDecoder<Object> decoder;
private final Queue<Message> queue = new ConcurrentLinkedQueue<Message>();
private final AtomicBoolean sent = new AtomicBoolean();
public PubSubEntry(MultiDecoder<Object> decoder) {
super();
this.decoder = decoder;
}
public MultiDecoder<Object> getDecoder() {
return decoder;
}
public Queue<Message> getQueue() {
return queue;
}
public AtomicBoolean getSent() {
return sent;
}
}

@ -89,7 +89,7 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
if (type == Type.PLAIN) { if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandDecoder()); ch.pipeline().addLast(new CommandDecoder());
} else { } else {
ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor())); ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder()));
} }
} }

@ -15,6 +15,13 @@
*/ */
package org.redisson.client.protocol.pubsub; package org.redisson.client.protocol.pubsub;
/**
*
* @author Nikita Koksharov
*
*/
public interface Message { public interface Message {
String getChannel();
} }

@ -15,6 +15,11 @@
*/ */
package org.redisson.client.protocol.pubsub; package org.redisson.client.protocol.pubsub;
/**
*
* @author Nikita Koksharov
*
*/
public class PubSubMessage implements Message { public class PubSubMessage implements Message {
private final String channel; private final String channel;

@ -15,6 +15,11 @@
*/ */
package org.redisson.client.protocol.pubsub; package org.redisson.client.protocol.pubsub;
/**
*
* @author Nikita Koksharov
*
*/
public class PubSubPatternMessage implements Message { public class PubSubPatternMessage implements Message {
private final String pattern; private final String pattern;

@ -15,6 +15,11 @@
*/ */
package org.redisson.client.protocol.pubsub; package org.redisson.client.protocol.pubsub;
/**
*
* @author Nikita Koksharov
*
*/
public class PubSubStatusMessage implements Message { public class PubSubStatusMessage implements Message {
private final PubSubType type; private final PubSubType type;

@ -91,6 +91,8 @@ public class Config {
private long lockWatchdogTimeout = 30 * 1000; private long lockWatchdogTimeout = 30 * 1000;
private boolean keepPubSubOrder = true;
public Config() { public Config() {
} }
@ -103,6 +105,7 @@ public class Config {
oldConf.setCodec(new JsonJacksonCodec()); oldConf.setCodec(new JsonJacksonCodec());
} }
setKeepPubSubOrder(oldConf.isKeepPubSubOrder());
setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout()); setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
setNettyThreads(oldConf.getNettyThreads()); setNettyThreads(oldConf.getNettyThreads());
setThreads(oldConf.getThreads()); setThreads(oldConf.getThreads());
@ -580,6 +583,26 @@ public class Config {
return lockWatchdogTimeout; return lockWatchdogTimeout;
} }
/**
* Define whether keep PubSub messages handling in arrival order
* or handle messages concurrently.
* <p>
* This setting applied only for PubSub messages published to single channel.
* <p>
* Default is <code>true</code>.
*
* @param keepPubSubOrder - <code>true</code> if order required, <code>false</code> otherwise.
* @return config
*/
public Config setKeepPubSubOrder(boolean keepPubSubOrder) {
this.keepPubSubOrder = keepPubSubOrder;
return this;
}
public boolean isKeepPubSubOrder() {
return keepPubSubOrder;
}
/** /**
* Read config object stored in JSON format from <code>String</code> * Read config object stored in JSON format from <code>String</code>
* *

@ -375,7 +375,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
.setSslKeystore(config.getSslKeystore()) .setSslKeystore(config.getSslKeystore())
.setSslKeystorePassword(config.getSslKeystorePassword()) .setSslKeystorePassword(config.getSslKeystorePassword())
.setDatabase(config.getDatabase()) .setDatabase(config.getDatabase())
.setClientName(config.getClientName()); .setClientName(config.getClientName())
.setKeepPubSubOrder(cfg.isKeepPubSubOrder());
if (type != NodeType.SENTINEL) { if (type != NodeType.SENTINEL) {
redisConfig.setPassword(config.getPassword()); redisConfig.setPassword(config.getPassword());

Loading…
Cancel
Save