From 557c2e6e3145a8f0a113a9292c511696bdbd3c56 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 11 Nov 2015 12:03:52 +0300 Subject: [PATCH] Sentinel info decode fixed --- .../client/handler/CommandDecoder.java | 15 ++-- .../org/redisson/client/handler/State.java | 5 +- .../client/protocol/RedisCommand.java | 5 ++ .../client/protocol/RedisCommands.java | 7 +- ...oder.java => ListResultReplayDecoder.java} | 30 ++----- .../protocol/decoder/NestedMultiDecoder2.java | 79 +++++++++++++++++++ .../connection/SentinelConnectionManager.java | 3 +- 7 files changed, 111 insertions(+), 33 deletions(-) rename src/main/java/org/redisson/client/protocol/decoder/{StringMapReplayDecoder.java => ListResultReplayDecoder.java} (51%) create mode 100644 src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index b5029ffe6..904b2f67a 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -100,7 +100,9 @@ public class CommandDecoder extends ReplayingDecoder { CommandData cmd = (CommandData)data; try { // if (state().getSize() > 0) { -// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), state().getRespParts()); +// List respParts = new ArrayList(); +// respParts.addAll(state().getRespParts()); +// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts); // } else { decode(in, cmd, null, ctx.channel(), currentDecoder); // } @@ -189,10 +191,9 @@ public class CommandDecoder extends ReplayingDecoder { handleResult(data, parts, result, false, channel); } else if (code == '*') { long size = readLong(in); -// state().setSize(size); +// state().setSizeOnce(size); List respParts = new ArrayList(); -// state().setRespParts(respParts); decodeMulti(in, data, parts, channel, currentDecoder, size, respParts); } else { @@ -214,10 +215,11 @@ public class CommandDecoder extends ReplayingDecoder { Object result = decoder.decode(respParts, state()); - // store current message index - checkpoint(); if (result instanceof Message) { + // store current message index + checkpoint(); + handleMultiResult(data, null, channel, result); // has next messages? if (in.writerIndex() > in.readerIndex()) { @@ -225,6 +227,9 @@ public class CommandDecoder extends ReplayingDecoder { } } else { handleMultiResult(data, parts, channel, result); +// if (parts != null && !decoder.isApplicable(parts.size(), state())) { +// state().setRespParts(parts); +// } } } diff --git a/src/main/java/org/redisson/client/handler/State.java b/src/main/java/org/redisson/client/handler/State.java index 84337b0e3..8eed56b08 100644 --- a/src/main/java/org/redisson/client/handler/State.java +++ b/src/main/java/org/redisson/client/handler/State.java @@ -29,7 +29,10 @@ public class State { super(); } - public void setSize(long size) { + public void setSizeOnce(long size) { + if (this.size != 0) { + return; + } this.size = size; } public long getSize() { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index d465e1f7d..446ef9dd1 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -148,6 +148,11 @@ public class RedisCommand { this(name, null, null, reponseDecoder, objectParamIndex); } + public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, ValueType outParamType) { + this(name, subName, replayMultiDecoder, -1); + this.outParamType = outParamType; + } + public RedisCommand(String name, MultiDecoder replayMultiDecoder, ValueType outParamType) { this(name, replayMultiDecoder, -1); this.outParamType = outParamType; diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index ef31a6f65..985e6accb 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -29,11 +29,13 @@ import org.redisson.client.protocol.convertor.KeyValueConvertor; import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; +import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; +import org.redisson.client.protocol.decoder.NestedMultiDecoder2; import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; @@ -43,7 +45,6 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; -import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; @@ -183,7 +184,9 @@ public interface RedisCommands { RedisStrictCommand> CLUSTER_INFO = new RedisStrictCommand>("CLUSTER", "INFO", new StringMapDataDecoder()); RedisStrictCommand> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); - RedisStrictCommand>> SENTINEL_SLAVES = new RedisStrictCommand>>("SENTINEL", "SLAVES", new StringMapReplayDecoder()); + RedisCommand>> SENTINEL_SLAVES = new RedisCommand>>("SENTINEL", "SLAVES", + new NestedMultiDecoder2(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT + ); RedisStrictCommand INFO_REPLICATION = new RedisStrictCommand("INFO", "replication", new StringDataDecoder()); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ListResultReplayDecoder.java similarity index 51% rename from src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java rename to src/main/java/org/redisson/client/protocol/decoder/ListResultReplayDecoder.java index ec9d8b5cb..da09096d2 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ListResultReplayDecoder.java @@ -15,9 +15,7 @@ */ package org.redisson.client.protocol.decoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -26,7 +24,7 @@ import org.redisson.client.handler.State; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; -public class StringMapReplayDecoder implements MultiDecoder>> { +public class ListResultReplayDecoder implements MultiDecoder>> { @Override public Object decode(ByteBuf buf, State state) { @@ -34,26 +32,10 @@ public class StringMapReplayDecoder implements MultiDecoder> decode(List parts, State state) { - // TODO refactor - if (!parts.isEmpty()) { - if (parts.get(0) instanceof List) { - List> result = new ArrayList>(parts.size()); - for (Object object : parts) { - List> list = (List>) object; - result.addAll(list); - } - return result; - } - } - - Map result = new HashMap(parts.size()/2); - for (int i = 0; i < parts.size(); i++) { - if (i % 2 != 0) { - result.put(parts.get(i-1).toString(), parts.get(i).toString()); - } - } - return Collections.singletonList(result); + @SuppressWarnings("unchecked") + public List> decode(List parts, State state) { + Map[] res = parts.toArray(new Map[parts.size()]); + return Arrays.asList(res); } @Override diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java new file mode 100644 index 000000000..902c0f904 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java @@ -0,0 +1,79 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class NestedMultiDecoder2 implements MultiDecoder { + + private final MultiDecoder firstDecoder; + private final MultiDecoder secondDecoder; + + public NestedMultiDecoder2(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { + this.firstDecoder = firstDecoder; + this.secondDecoder = secondDecoder; + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return firstDecoder.decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + if (paramNum == 0) { + setCounter(state, 0); + } + return firstDecoder.isApplicable(paramNum, state); + } + + private Integer getCounter(State state) { + Integer value = state.getDecoderState(); + if (value == null) { + return 0; + } + return value; + } + + private void setCounter(State state, Integer value) { + state.setDecoderState(value); + } + + + @Override + public Object decode(List parts, State state) { + if (getCounter(state) == 2) { + setCounter(state, 0); + } + int counter = getCounter(state); + counter++; + setCounter(state, counter); + MultiDecoder decoder = null; + if (counter == 1) { + decoder = firstDecoder; + } + if (counter == 2) { + decoder = secondDecoder; + } + return decoder.decode(parts, state); + } + +} diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index dcef8662d..478f73b42 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -30,6 +30,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; @@ -84,7 +85,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { currentMaster.set(masterHost); log.info("master: {} added", masterHost); - List> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); + List> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); for (Map map : sentinelSlaves) { if (map.isEmpty()) { continue;