|
|
@ -82,7 +82,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
throw e;
|
|
|
|
throw e;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (data instanceof CommandData) {
|
|
|
|
} else if (data instanceof CommandData) {
|
|
|
|
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
|
|
|
|
CommandData<Object, Object> cmd = (CommandData<Object, Object>) data;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
while (in.writerIndex() > in.readerIndex()) {
|
|
|
|
while (in.writerIndex() > in.readerIndex()) {
|
|
|
|
decode(in, cmd, null, channel, false, null);
|
|
|
|
decode(in, cmd, null, channel, false, null);
|
|
|
@ -99,7 +99,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
|
|
|
|
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
|
|
|
|
final Object result) throws IOException {
|
|
|
|
Object result) throws IOException {
|
|
|
|
if (executor.isShutdown()) {
|
|
|
|
if (executor.isShutdown()) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -107,7 +107,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
if (result instanceof Message) {
|
|
|
|
if (result instanceof Message) {
|
|
|
|
checkpoint();
|
|
|
|
checkpoint();
|
|
|
|
|
|
|
|
|
|
|
|
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
|
|
|
|
RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
|
|
|
|
ChannelName channelName = ((Message) result).getChannel();
|
|
|
|
ChannelName channelName = ((Message) result).getChannel();
|
|
|
|
if (result instanceof PubSubStatusMessage) {
|
|
|
|
if (result instanceof PubSubStatusMessage) {
|
|
|
|
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
|
|
|
|
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
|
|
|
@ -121,7 +121,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
|
|
|
|
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
|
|
|
|
commands.remove(key);
|
|
|
|
commands.remove(key);
|
|
|
|
if (result instanceof PubSubPatternMessage) {
|
|
|
|
if (result instanceof PubSubPatternMessage) {
|
|
|
|
channelName = ((PubSubPatternMessage)result).getPattern();
|
|
|
|
channelName = ((PubSubPatternMessage) result).getPattern();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
PubSubEntry entry = entries.remove(channelName);
|
|
|
|
PubSubEntry entry = entries.remove(channelName);
|
|
|
|
if (keepOrder) {
|
|
|
|
if (keepOrder) {
|
|
|
@ -133,7 +133,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
|
|
|
|
|
|
|
|
if (keepOrder) {
|
|
|
|
if (keepOrder) {
|
|
|
|
if (result instanceof PubSubPatternMessage) {
|
|
|
|
if (result instanceof PubSubPatternMessage) {
|
|
|
|
channelName = ((PubSubPatternMessage)result).getPattern();
|
|
|
|
channelName = ((PubSubPatternMessage) result).getPattern();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
PubSubEntry entry = entries.get(channelName);
|
|
|
|
PubSubEntry entry = entries.get(channelName);
|
|
|
|
if (entry != null) {
|
|
|
|
if (entry != null) {
|
|
|
@ -160,9 +160,9 @@ public class CommandPubSubDecoder extends CommandDecoder {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void enqueueMessage(Object res, final RedisPubSubConnection pubSubConnection, final PubSubEntry entry) {
|
|
|
|
private void enqueueMessage(Object res, RedisPubSubConnection pubSubConnection, PubSubEntry entry) {
|
|
|
|
if (res != null) {
|
|
|
|
if (res != null) {
|
|
|
|
entry.getQueue().add((Message)res);
|
|
|
|
entry.getQueue().add((Message) res);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (entry.getSent().compareAndSet(false, true)) {
|
|
|
|
if (entry.getSent().compareAndSet(false, true)) {
|
|
|
|