Replaying phase handling in CommandDecoder

pull/472/head
Nikita 9 years ago
parent 26f2d0a4b9
commit abca09e5b7

@ -36,6 +36,7 @@ import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
@ -79,41 +80,43 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
Decoder<Object> currentDecoder = null;
if (data == null) {
currentDecoder = StringCodec.INSTANCE.getValueDecoder();
}
if (state() == null) {
state(new State());
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
if (state() == null) {
boolean makeCheckpoint = data != null;
if (data != null) {
if (data instanceof CommandsData) {
makeCheckpoint = false;
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) {
makeCheckpoint = false;
}
}
}
state(new State(makeCheckpoint));
}
state().setDecoderState(null);
if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder);
decode(in, null, null, ctx.channel());
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
// if (state().getSize() > 0) {
// List<Object> respParts = new ArrayList<Object>();
// if (state().getRespParts() != null) {
// respParts = state().getRespParts();
// }
// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts, true);
// } else {
decode(in, cmd, null, ctx.channel(), currentDecoder);
// }
if (state().getLevels().size() > 0) {
decodeFromCheckpoint(ctx, in, data, cmd);
} else {
decode(in, cmd, null, ctx.channel());
}
} catch (IOException e) {
cmd.getPromise().tryFailure(e);
}
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;
handleCommandsDataResponse(ctx, in, data, currentDecoder, commands);
decodeCommandBatch(ctx, in, data, commands);
return;
}
@ -122,18 +125,54 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state(null);
}
private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
Decoder<Object> currentDecoder, CommandsData commands) {
int i = state().getIndex();
private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandData<Object, Object> cmd) throws IOException {
if (state().getLevels().size() == 2) {
StateLevel secondLevel = state().getLevels().get(1);
if (secondLevel.getParts().isEmpty()) {
state().getLevels().remove(1);
}
}
if (state().getLevels().size() == 2) {
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);
decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
Channel channel = ctx.channel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts(), channel);
if (decoder != null) {
Object result = decoder.decode(firstLevel.getParts(), state());
if (data != null) {
handleResult(cmd, null, result, true, channel);
}
}
}
if (state().getLevels().size() == 1) {
StateLevel firstLevel = state().getLevels().get(0);
if (firstLevel.getParts().isEmpty()) {
state().resetLevel();
decode(in, cmd, null, ctx.channel());
} else {
decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
}
}
}
private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandsData commandBatch) {
int i = state().getBatchIndex();
RedisException error = null;
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null;
try {
checkpoint();
state().setIndex(i);
cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder);
state().setBatchIndex(i);
cmd = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
decode(in, cmd, null, ctx.channel());
i++;
} catch (IOException e) {
cmd.getPromise().tryFailure(e);
@ -147,8 +186,8 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
if (i == commands.getCommands().size()) {
Promise<Void> promise = commands.getPromise();
if (i == commandBatch.getCommands().size()) {
Promise<Void> promise = commandBatch.getPromise();
if (error != null) {
if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
@ -164,11 +203,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state(null);
} else {
checkpoint();
state().setIndex(i);
state().setBatchIndex(i);
}
}
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel) throws IOException {
int code = in.readByte();
if (code == '+') {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
@ -212,32 +251,40 @@ public class CommandDecoder extends ReplayingDecoder<State> {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf, state());
Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state());
}
handleResult(data, parts, result, false, channel);
} else if (code == '*') {
int level = state().incLevel();
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
boolean top = false;
// if (state().trySetSize(size)) {
// state().setRespParts(respParts);
// top = true;
// }
decodeMulti(in, data, parts, channel, currentDecoder, size, respParts, top);
List<Object> respParts;
if (state().getLevels().size()-1 >= level) {
StateLevel stateLevel = state().getLevels().get(level);
respParts = stateLevel.getParts();
size = stateLevel.getSize();
} else {
respParts = new ArrayList<Object>();
if (state().isMakeCheckpoint()) {
state().addLevel(new StateLevel(size, respParts));
}
}
decodeMulti(in, data, parts, channel, size, respParts);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}
private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts, boolean top)
Channel channel, long size, List<Object> respParts)
throws IOException {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, currentDecoder);
// if (top) {
// checkpoint();
// }
decode(in, data, respParts, channel);
if (state().isMakeCheckpoint()) {
checkpoint();
}
}
MultiDecoder<Object> decoder = messageDecoder(data, respParts, channel);
@ -246,7 +293,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
Object result = decoder.decode(respParts, state());
if (data != null) {
handleResult(data, parts, result, true, channel);
return;
}
if (result instanceof Message) {
// store current message index
@ -255,18 +305,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
handleMultiResult(data, null, channel, result);
// has next messages?
if (in.writerIndex() > in.readerIndex()) {
decode(in, data, null, channel, currentDecoder);
decode(in, data, null, channel);
}
} else {
handleMultiResult(data, parts, channel, result);
}
}
private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
Channel channel, Object result) {
if (data != null) {
handleResult(data, parts, result, true, channel);
} else {
if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
CommandData<Object, Object> d = pubSubChannels.get(channelName);
@ -289,7 +334,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
}
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
if (data != null) {
@ -329,7 +373,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return data.getCommand().getReplayMultiDecoder();
}
private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
private Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (data == null) {
if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
@ -339,7 +383,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName);
}
return currentDecoder;
return StringCodec.INSTANCE.getValueDecoder();
}
Decoder<Object> decoder = data.getCommand().getReplayDecoder();

@ -15,50 +15,80 @@
*/
package org.redisson.client.handler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.redisson.client.protocol.decoder.DecoderState;
public class State {
private int index;
private Object decoderState;
private int batchIndex;
private DecoderState decoderState;
private int level = -1;
private List<StateLevel> levels;
private DecoderState decoderStateCopy;
private final boolean makeCheckpoint;
private long size;
private List<Object> respParts;
public State(boolean makeCheckpoint) {
this.makeCheckpoint = makeCheckpoint;
}
public State() {
super();
public boolean isMakeCheckpoint() {
return makeCheckpoint;
}
public boolean trySetSize(long size) {
if (this.size != 0) {
return false;
public void resetLevel() {
level = -1;
}
this.size = size;
return true;
public int decLevel() {
return --level;
}
public long getSize() {
return size;
public int incLevel() {
return ++level;
}
public void setRespParts(List<Object> respParts) {
this.respParts = respParts;
public void addLevel(StateLevel stateLevel) {
if (levels == null) {
levels = new ArrayList<StateLevel>(2);
}
public List<Object> getRespParts() {
return respParts;
levels.add(stateLevel);
}
public List<StateLevel> getLevels() {
if (levels == null) {
return Collections.emptyList();
}
return levels;
}
public void setIndex(int index) {
this.index = index;
public void setBatchIndex(int index) {
this.batchIndex = index;
}
public int getIndex() {
return index;
public int getBatchIndex() {
return batchIndex;
}
public <T> T getDecoderState() {
public <T extends DecoderState> T getDecoderState() {
return (T) decoderState;
}
public void setDecoderState(Object decoderState) {
public void setDecoderState(DecoderState decoderState) {
this.decoderState = decoderState;
}
public DecoderState getDecoderStateCopy() {
return decoderStateCopy;
}
public void setDecoderStateCopy(DecoderState decoderStateCopy) {
this.decoderStateCopy = decoderStateCopy;
}
@Override
public String toString() {
return "State [batchIndex=" + batchIndex + ", decoderState=" + decoderState + ", level=" + level + ", levels="
+ levels + ", decoderStateCopy=" + decoderStateCopy + "]";
}
}

@ -0,0 +1,29 @@
package org.redisson.client.handler;
import java.util.List;
public class StateLevel {
private long size;
private List<Object> parts;
public StateLevel(long size, List<Object> parts) {
super();
this.size = size;
this.parts = parts;
}
public long getSize() {
return size;
}
public List<Object> getParts() {
return parts;
}
@Override
public String toString() {
return "StateLevel [size=" + size + ", parts=" + parts + "]";
}
}

@ -0,0 +1,7 @@
package org.redisson.client.protocol.decoder;
public interface DecoderState {
DecoderState copy();
}

@ -38,7 +38,7 @@ public class FlatNestedMultiDecoder<T> extends NestedMultiDecoder {
@Override
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
NestedDecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.resetDecoderIndex();
}

@ -24,13 +24,19 @@ import io.netty.buffer.ByteBuf;
public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
public static class DecoderState {
public static class NestedDecoderState implements DecoderState {
int decoderIndex;
int flipDecoderIndex;
public DecoderState() {
public NestedDecoderState() {
}
public NestedDecoderState(int decoderIndex, int flipDecoderIndex) {
super();
this.decoderIndex = decoderIndex;
this.flipDecoderIndex = flipDecoderIndex;
}
public int getDecoderIndex() {
@ -53,6 +59,16 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
flipDecoderIndex++;
}
@Override
public DecoderState copy() {
return new NestedDecoderState(decoderIndex, flipDecoderIndex);
}
@Override
public String toString() {
return "NestedDecoderState [decoderIndex=" + decoderIndex + ", flipDecoderIndex=" + flipDecoderIndex + "]";
}
}
protected final MultiDecoder<Object> firstDecoder;
@ -81,7 +97,7 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
DecoderState ds = getDecoder(state);
NestedDecoderState ds = getDecoder(state);
MultiDecoder<?> decoder = null;
if (ds.getFlipDecoderIndex() == 2) {
@ -96,7 +112,7 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
@Override
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
NestedDecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.incFlipDecoderIndex();
ds.resetDecoderIndex();
@ -118,10 +134,10 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
return decoder.isApplicable(paramNum, state);
}
protected final DecoderState getDecoder(State state) {
DecoderState ds = state.getDecoderState();
protected final NestedDecoderState getDecoder(State state) {
NestedDecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new DecoderState();
ds = new NestedDecoderState();
state.setDecoderState(ds);
}
return ds;
@ -137,7 +153,7 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
return decoder.decode(parts, state);
}
DecoderState ds = getDecoder(state);
NestedDecoderState ds = getDecoder(state);
if (parts.isEmpty()) {
ds.resetDecoderIndex();
}

@ -21,11 +21,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class CacheGetAllDecoder implements MultiDecoder<List<Object>> {
@ -37,7 +37,7 @@ public class CacheGetAllDecoder implements MultiDecoder<List<Object>> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
return LongCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override

@ -5,6 +5,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.junit.Test;
import org.redisson.core.GeoEntry;
@ -104,6 +105,61 @@ public class RedissonGeoTest extends BaseTest {
assertThat(geo.radiusWithDistance(15, 37, 200, GeoUnit.KILOMETERS)).isEqualTo(expected);
}
@Test
public void testRadiusWithDistanceHugeAmount() {
RGeo<String> geo = redisson.getGeo("test");
for (int i = 0; i < 10000; i++) {
geo.add(10 + 0.000001*i, 11 + 0.000001*i, "" + i);
}
Map<String, Double> res = geo.radiusWithDistance(10, 11, 200, GeoUnit.KILOMETERS);
assertThat(res).hasSize(10000);
}
@Test
public void testRadiusWithPositionHugeAmount() {
RGeo<String> geo = redisson.getGeo("test");
for (int i = 0; i < 10000; i++) {
geo.add(10 + 0.000001*i, 11 + 0.000001*i, "" + i);
}
Map<String, GeoPosition> res = geo.radiusWithPosition(10, 11, 200, GeoUnit.KILOMETERS);
assertThat(res).hasSize(10000);
}
@Test
public void testRadiusWithDistanceBigObject() {
RGeo<Map<String, String>> geo = redisson.getGeo("test");
Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < 150; i++) {
map.put("" + i, "" + i);
}
geo.add(new GeoEntry(13.361389, 38.115556, map));
Map<String, String> map1 = new HashMap<String, String>(map);
map1.remove("100");
geo.add(new GeoEntry(15.087269, 37.502669, map1));
Map<String, String> map2 = new HashMap<String, String>(map);
map2.remove("0");
geo.add(new GeoEntry(15.081269, 37.502169, map2));
Map<Map<String, String>, Double> expected = new HashMap<Map<String, String>, Double>();
expected.put(map, 190.4424);
expected.put(map1, 56.4413);
expected.put(map2, 56.3159);
Map<Map<String, String>, Double> res = geo.radiusWithDistance(15, 37, 200, GeoUnit.KILOMETERS);
assertThat(res.keySet()).containsOnlyElementsOf(expected.keySet());
assertThat(res.values()).containsOnlyElementsOf(expected.values());
}
@Test
public void testRadiusWithDistanceEmpty() {
RGeo<String> geo = redisson.getGeo("test");

@ -5,8 +5,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -47,6 +49,19 @@ public class RedissonSetCacheTest extends BaseTest {
assertThat(set.readAll()).isEmpty();
}
@Test
public void testAddBigBean() {
RSetCache<Map<Integer, Integer>> set = redisson.getSetCache("simple");
Map<Integer, Integer> map = new HashMap<Integer, Integer>();
for (int i = 0; i < 150; i++) {
map.put(i, i);
}
set.add(map);
map.remove(0);
set.add(map);
set.iterator().next();
}
@Test
public void testAddBean() throws InterruptedException, ExecutionException {
SimpleBean sb = new SimpleBean();

Loading…
Cancel
Save