Fixed - Class cast exception is thrown during iteration of RMapCache entries #3156

pull/3438/head
Nikita Koksharov 4 years ago
parent 14c26e3171
commit 6b1785e810

@ -22,10 +22,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.client.protocol.decoder.*;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
@ -166,6 +163,12 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
});
}
private static final RedisCommand<org.redisson.api.StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectMapDecoder(false)));
@Override
public Flux<ReactiveRedisConnection.CommandResponse<XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<XInfoCommand> publisher) {
return execute(publisher, command -> {
@ -174,13 +177,7 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
byte[] k = toByteArray(command.getKey());
RedisCommand<org.redisson.api.StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(ByteArrayCodec.INSTANCE, false)));
Mono<org.redisson.api.StreamInfo<byte[], byte[]>> m = write(k, StringCodec.INSTANCE, xinfoStreamCommand, k);
Mono<org.redisson.api.StreamInfo<byte[], byte[]>> m = write(k, ByteArrayCodec.INSTANCE, XINFO_STREAM, k);
return m.map(i -> {
Map<String, Object> res = new HashMap<>();

@ -156,17 +156,18 @@ public class RedissonStreamCommands implements RedisStreamCommands {
}
}
private static final RedisCommand<org.redisson.api.StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new XInfoStreamReplayDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectMapDecoder(false)));
@Override
public StreamInfo.XInfoStream xInfo(byte[] key) {
Assert.notNull(key, "Key must not be null!");
RedisCommand<org.redisson.api.StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new XInfoStreamReplayDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(ByteArrayCodec.INSTANCE, false)));
return connection.write(key, StringCodec.INSTANCE, xinfoStreamCommand, key);
return connection.write(key, ByteArrayCodec.INSTANCE, XINFO_STREAM, key);
}
private static class XInfoGroupsReplayDecoder implements MultiDecoder<StreamInfo.XInfoGroups> {

@ -22,10 +22,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.client.protocol.decoder.*;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
@ -166,6 +163,12 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
});
}
private static final RedisCommand<org.redisson.api.StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectMapDecoder(false)));
@Override
public Flux<ReactiveRedisConnection.CommandResponse<XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<XInfoCommand> publisher) {
return execute(publisher, command -> {
@ -174,13 +177,7 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
byte[] k = toByteArray(command.getKey());
RedisCommand<org.redisson.api.StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(ByteArrayCodec.INSTANCE, false)));
Mono<org.redisson.api.StreamInfo<byte[], byte[]>> m = write(k, StringCodec.INSTANCE, xinfoStreamCommand, k);
Mono<org.redisson.api.StreamInfo<byte[], byte[]>> m = write(k, ByteArrayCodec.INSTANCE, XINFO_STREAM, k);
return m.map(i -> {
Map<String, Object> res = new HashMap<>();

@ -156,17 +156,17 @@ public class RedissonStreamCommands implements RedisStreamCommands {
}
}
private static final RedisCommand<org.redisson.api.StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new XInfoStreamReplayDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectMapDecoder(false)));
@Override
public StreamInfo.XInfoStream xInfo(byte[] key) {
Assert.notNull(key, "Key must not be null!");
RedisCommand<org.redisson.api.StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new XInfoStreamReplayDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(ByteArrayCodec.INSTANCE, false)));
return connection.write(key, StringCodec.INSTANCE, xinfoStreamCommand, key);
return connection.write(key, ByteArrayCodec.INSTANCE, XINFO_STREAM, key);
}
private static class XInfoGroupsReplayDecoder implements MultiDecoder<StreamInfo.XInfoGroups> {

@ -1392,6 +1392,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return get(scanIteratorAsync(name, client, startPos, pattern, count));
}
private static final RedisCommand<MapCacheScanResult<Object, Object>> SCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder2(
new MapCacheScanResultReplayDecoder(),
new MapEntriesDecoder(new ObjectMapDecoder(true))));
@Override
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
@ -1402,11 +1407,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
params.add(count);
RedisCommand<MapCacheScanResult<Object, Object>> command = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder2(
new MapCacheScanResultReplayDecoder(),
new MapEntriesDecoder(new ObjectMapDecoder(codec, true))));
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, command,
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, SCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res; "

@ -1007,15 +1007,16 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
return get(getInfoAsync());
}
private static final RedisCommand<StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(false)));
@Override
public RFuture<StreamInfo<K, V>> getInfoAsync() {
RedisCommand<StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(getCodec(), false)));
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, xinfoStreamCommand, getName());
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, XINFO_STREAM, getName());
}
@Override

@ -26,9 +26,19 @@ public class State {
private int level = -1;
private Object value;
public State() {
}
public <T> T getValue() {
return (T) value;
}
public void setValue(Object value) {
this.value = value;
}
public int getLevel() {
return level;
}

@ -30,44 +30,39 @@ import org.redisson.client.protocol.Decoder;
*/
public class ObjectMapDecoder implements MultiDecoder<Object> {
private final Codec codec;
private final boolean decodeList;
public ObjectMapDecoder(Codec codec, boolean decodeList) {
public ObjectMapDecoder(boolean decodeList) {
super();
this.codec = codec;
this.decodeList = decodeList;
}
private int pos;
private boolean mapDecoded;
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if (mapDecoded) {
return this.codec.getMapKeyDecoder();
if (state.getValue() != null && (Boolean) state.getValue()) {
return codec.getMapKeyDecoder();
}
if (pos++ % 2 == 0) {
return this.codec.getMapKeyDecoder();
if (paramNum % 2 == 0) {
return codec.getMapKeyDecoder();
}
return this.codec.getMapValueDecoder();
return codec.getMapValueDecoder();
}
@Override
public Object decode(List<Object> parts, State state) {
if (decodeList && mapDecoded) {
if (decodeList && (state.getValue() != null && (Boolean) state.getValue())) {
return parts;
}
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
Map<Object, Object> result = new LinkedHashMap<>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i));
}
}
mapDecoded = true;
state.setValue(true);
return result;
}

Loading…
Cancel
Save