PING support for Pub/Sub. #1281

pull/1423/head
Nikita 7 years ago
parent 56e276f9b4
commit d50660542f

@ -167,10 +167,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
return null; return null;
} }
return commandData.getCommand().getReplayMultiDecoder(); return commandData.getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) { } else if (command.equals("message")) {
String channelName = (String) parts.get(1); String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder(); return entries.get(channelName).getDecoder();
} else if (parts.get(0).equals("pmessage")) { } else if (command.equals("pmessage")) {
String patternName = (String) parts.get(1); String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder(); return entries.get(patternName).getDecoder();
} }
@ -191,6 +191,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
return entries.get(patternName).getDecoder().getDecoder(parts.size(), state()); return entries.get(patternName).getDecoder().getDecoder(parts.size(), state());
} }
} }
if (data != null && data.getCommand().getName().equals(RedisCommands.PING.getName())) {
return data.getCodec().getValueDecoder();
}
return super.selectDecoder(data, parts); return super.selectDecoder(data, parts);
} }

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
@ -50,12 +51,12 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
protected void sendPing(final ChannelHandlerContext ctx) { protected void sendPing(final ChannelHandlerContext ctx) {
RedisConnection connection = RedisConnection.getFrom(ctx.channel()); RedisConnection connection = RedisConnection.getFrom(ctx.channel());
final RFuture<Object> future = connection.async(RedisCommands.PING); final RFuture<String> future = connection.async(StringCodec.INSTANCE, RedisCommands.PING);
config.getTimer().newTimeout(new TimerTask() { config.getTimer().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (future.cancel(false)) { if (future.cancel(false) || !future.isSuccess()) {
ctx.channel().close(); ctx.channel().close();
} else { } else {
sendPing(ctx); sendPing(ctx);

Loading…
Cancel
Save