|
|
|
@ -77,21 +77,24 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
|
commands.remove(key);
|
|
|
|
|
entries.put(channelName, new PubSubEntry(d.getMessageDecoder()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
|
|
|
|
|
commands.remove(key);
|
|
|
|
|
entries.remove(key);
|
|
|
|
|
entries.remove(channelName);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
|
|
|
|
|
|
|
|
|
|
if (keepOrder) {
|
|
|
|
|
String ch = ((Message) result).getChannel();
|
|
|
|
|
String channelName = ((Message) result).getChannel();
|
|
|
|
|
if (result instanceof PubSubPatternMessage) {
|
|
|
|
|
ch = ((PubSubPatternMessage)result).getPattern();
|
|
|
|
|
channelName = ((PubSubPatternMessage)result).getPattern();
|
|
|
|
|
}
|
|
|
|
|
PubSubEntry item = entries.get(channelName);
|
|
|
|
|
if (item != null) {
|
|
|
|
|
enqueueMessage(result, pubSubConnection, item);
|
|
|
|
|
}
|
|
|
|
|
PubSubEntry item = entries.get(ch);
|
|
|
|
|
enqueueMessage(result, pubSubConnection, item);
|
|
|
|
|
} else {
|
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|