Fixed - ByteBuf leaks when one of multiple parameters can't be encoded. #4576

pull/4608/head
Nikita Koksharov 2 years ago
parent 7653aa1db1
commit c8644061d4

@ -107,7 +107,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
}
private RFuture<Long> addAsync(String subCommand, GeoEntry... entries) {
List<Object> params = new ArrayList<Object>(entries.length + 2);
List<Object> params = new ArrayList<>(entries.length + 2);
params.add(getRawName());
if (!subCommand.isEmpty()) {
params.add(subCommand);
@ -115,7 +115,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
for (GeoEntry entry : entries) {
params.add(entry.getLongitude());
params.add(entry.getLatitude());
params.add(encode(entry.getMember()));
params.add(encode(params, entry.getMember()));
}
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.GEOADD, params.toArray());
}
@ -187,10 +187,10 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, String>> hashAsync(V... members) {
List<Object> params = new ArrayList<Object>(members.length + 1);
List<Object> params = new ArrayList<>(members.length + 1);
params.add(getRawName());
for (Object member : members) {
params.add(encode(member));
params.add(encode(params, member));
}
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOHASH",
new MapGetAllDecoder((List<Object>) Arrays.asList(members), 0));
@ -204,10 +204,10 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> posAsync(V... members) {
List<Object> params = new ArrayList<Object>(members.length + 1);
List<Object> params = new ArrayList<>(members.length + 1);
params.add(getRawName());
for (Object member : members) {
params.add(encode(member));
params.add(encode(params, member));
}
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder2(

@ -16,7 +16,6 @@
package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
@ -827,14 +826,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
continue;
}
try {
mapKeys.add(encodeMapKey(value.getKey()));
} catch (Exception e) {
mapKeys.forEach(v -> {
ReferenceCountUtil.safeRelease(v);
});
throw e;
}
mapKeys.add(encodeMapKey(value.getKey(), mapKeys));
result.add((V) value.getValue());
}
@ -877,14 +869,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (value == null) {
continue;
}
try {
mapKeys.add(encodeMapKey(value.getKey()));
} catch (Exception e) {
mapKeys.forEach(v -> {
ReferenceCountUtil.safeRelease(v);
});
throw e;
}
mapKeys.add(encodeMapKey(value.getKey(), mapKeys));
result.put((K) value.getKey(), (V) value.getValue());
}
@ -940,14 +925,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
continue;
}
try {
mapKeys.add(encodeMapKey(value.getKey()));
} catch (Exception e) {
mapKeys.forEach(v -> {
ReferenceCountUtil.safeRelease(v);
});
throw e;
}
mapKeys.add(encodeMapKey(value.getKey(), mapKeys));
result.add(new AbstractMap.SimpleEntry<K, V>((K) value.getKey(), (V) value.getValue()));
}

@ -244,18 +244,10 @@ public abstract class RedissonObject implements RObject {
return codec;
}
protected List<ByteBuf> encode(Object... values) {
List<ByteBuf> result = new ArrayList<>(values.length);
for (Object object : values) {
result.add(encode(object));
}
return result;
}
protected List<ByteBuf> encode(Collection<?> values) {
List<ByteBuf> result = new ArrayList<>(values.size());
for (Object object : values) {
result.add(encode(object));
result.add(encode(result, object));
}
return result;
}
@ -320,11 +312,33 @@ public abstract class RedissonObject implements RObject {
public ByteBuf encode(Object value) {
return commandExecutor.encode(codec, value);
}
public ByteBuf encode(Collection<?> params, Object value) {
try {
return commandExecutor.encode(codec, value);
} catch (Exception e) {
params.forEach(v -> {
ReferenceCountUtil.safeRelease(v);
});
throw e;
}
}
public ByteBuf encodeMapKey(Object value) {
return commandExecutor.encodeMapKey(codec, value);
}
public ByteBuf encodeMapKey(Object value, Collection<Object> params) {
try {
return encodeMapKey(value);
} catch (Exception e) {
params.forEach(v -> {
ReferenceCountUtil.safeRelease(v);
});
throw e;
}
}
public ByteBuf encodeMapValue(Object value) {
return commandExecutor.encodeMapValue(codec, value);
}

@ -54,15 +54,15 @@ public class RedissonQueueSemaphore extends RedissonSemaphore {
public RFuture<Boolean> tryAcquireAsync(int permits) {
List<Object> params;
if (values != null) {
params = new ArrayList<Object>(values.size() + 1);
params = new ArrayList<>(values.size() + 1);
params.add(values.size());
for (Object value : values) {
params.add(encode(value));
params.add(encode(params, value));
}
} else {
params = new ArrayList<Object>(2);
params = new ArrayList<>(2);
params.add(1);
params.add(encode(value));
params.add(encode(params, value));
}
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +

@ -400,7 +400,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<List<Integer>> addAndGetRevRankAsync(Map<? extends V, Double> map) {
final List<Object> params = new ArrayList<Object>(map.size() * 2);
final List<Object> params = new ArrayList<>(map.size() * 2);
for (java.util.Map.Entry<? extends V, Double> t : map.entrySet()) {
if (t.getKey() == null) {
throw new NullPointerException("map key can't be null");
@ -408,7 +408,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
if (t.getValue() == null) {
throw new NullPointerException("map value can't be null");
}
params.add(encode(t.getKey()));
params.add(encode(params, t.getKey()));
params.add(BigDecimal.valueOf(t.getValue()).toPlainString());
}
@ -521,11 +521,11 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
if (objects.isEmpty()) {
return new CompletableFutureWrapper<>(0);
}
List<Object> params = new ArrayList<Object>(objects.size()*2+1);
List<Object> params = new ArrayList<>(objects.size() * 2 + 1);
params.add(getRawName());
for (Entry<V, Double> entry : objects.entrySet()) {
params.add(BigDecimal.valueOf(entry.getValue()).toPlainString());
params.add(encode(entry.getKey()));
params.add(encode(params, entry.getKey()));
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, params.toArray());
@ -546,7 +546,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
params.add("NX");
for (Entry<V, Double> entry : objects.entrySet()) {
params.add(BigDecimal.valueOf(entry.getValue()).toPlainString());
params.add(encode(entry.getKey()));
params.add(encode(params, entry.getKey()));
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, params.toArray());
@ -568,7 +568,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
params.add("CH");
for (Entry<V, Double> entry : objects.entrySet()) {
params.add(BigDecimal.valueOf(entry.getValue()).toPlainString());
params.add(encode(entry.getKey()));
params.add(encode(params, entry.getKey()));
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, params.toArray());
@ -590,7 +590,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
params.add("CH");
for (Entry<V, Double> entry : objects.entrySet()) {
params.add(BigDecimal.valueOf(entry.getValue()).toPlainString());
params.add(encode(entry.getKey()));
params.add(encode(params, entry.getKey()));
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, params.toArray());
@ -612,7 +612,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
params.add("CH");
for (Entry<V, Double> entry : objects.entrySet()) {
params.add(BigDecimal.valueOf(entry.getValue()).toPlainString());
params.add(encode(entry.getKey()));
params.add(encode(params, entry.getKey()));
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.ZADD_INT, params.toArray());
@ -946,10 +946,10 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return deleteAsync();
}
List<Object> params = new ArrayList<Object>(c.size()*2);
List<Object> params = new ArrayList<>(c.size() * 2);
for (Object object : c) {
params.add(0);
params.add(encode((V) object));
params.add(encode(params, object));
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -958,7 +958,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
+ "local size = redis.call('zinterstore', KEYS[1], 2, KEYS[1], KEYS[2], 'aggregate', 'sum');"
+ "redis.call('del', KEYS[2]); "
+ "return size ~= prevSize and 1 or 0; ",
Arrays.<Object>asList(getRawName(), "redisson_temp__{" + getRawName() + "}"), params.toArray());
Arrays.asList(getRawName(), "redisson_temp__{" + getRawName() + "}"), params.toArray());
}
@Override
@ -982,7 +982,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"redis.call('zincrby', KEYS[1], ARGV[1], ARGV[2]); "
+"return redis.call('zrank', KEYS[1], ARGV[2]); ",
Collections.<Object>singletonList(getRawName()), new BigDecimal(value.toString()).toPlainString(), encode(object));
Collections.singletonList(getRawName()), new BigDecimal(value.toString()).toPlainString(), encode(object));
}
@Override
@ -995,7 +995,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"redis.call('zincrby', KEYS[1], ARGV[1], ARGV[2]); "
+"return redis.call('zrevrank', KEYS[1], ARGV[2]); ",
Collections.<Object>singletonList(getRawName()), new BigDecimal(value.toString()).toPlainString(), encode(object));
Collections.singletonList(getRawName()), new BigDecimal(value.toString()).toPlainString(), encode(object));
}
@Override
@ -1194,7 +1194,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> intersectionAsync(Aggregate aggregate, String... names) {
List<Object> args = new ArrayList<Object>(names.length + 4);
List<Object> args = new ArrayList<>(names.length + 4);
args.add(getRawName());
args.add(names.length);
args.addAll(Arrays.asList(names));
@ -1220,12 +1220,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> intersectionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
List<Object> args = new ArrayList<>(nameWithWeight.size() * 2 + 5);
args.add(getRawName());
args.add(nameWithWeight.size());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
List<String> weights = new ArrayList<>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
@ -1252,7 +1252,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Collection<V>> readIntersectionAsync(Aggregate aggregate, String... names) {
List<Object> args = new ArrayList<Object>(names.length + 4);
List<Object> args = new ArrayList<>(names.length + 4);
args.add(names.length + 1);
args.add(getRawName());
args.addAll(Arrays.asList(names));
@ -1278,12 +1278,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Collection<V>> readIntersectionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
List<Object> args = new ArrayList<>(nameWithWeight.size() * 2 + 5);
args.add(nameWithWeight.size() + 1);
args.add(getRawName());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
List<String> weights = new ArrayList<>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
@ -1338,7 +1338,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> unionAsync(Aggregate aggregate, String... names) {
List<Object> args = new ArrayList<Object>(names.length + 4);
List<Object> args = new ArrayList<>(names.length + 4);
args.add(getRawName());
args.add(names.length);
args.addAll(Arrays.asList(names));
@ -1364,12 +1364,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> unionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
List<Object> args = new ArrayList<>(nameWithWeight.size() * 2 + 5);
args.add(getRawName());
args.add(nameWithWeight.size());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
List<String> weights = new ArrayList<>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
@ -1422,12 +1422,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Collection<V>> readUnionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
List<Object> args = new ArrayList<>(nameWithWeight.size() * 2 + 5);
args.add(nameWithWeight.size() + 1);
args.add(getRawName());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
List<String> weights = new ArrayList<>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
@ -1479,7 +1479,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public <T> Collection<T> readSort(String byPattern, List<String> getPatterns, SortOrder order) {
return (Collection<T>) get(readSortAsync(byPattern, getPatterns, order));
return get(readSortAsync(byPattern, getPatterns, order));
}
@Override
@ -1489,7 +1489,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public <T> Collection<T> readSort(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return (Collection<T>) get(readSortAsync(byPattern, getPatterns, order, offset, count));
return get(readSortAsync(byPattern, getPatterns, order, offset, count));
}
@Override
@ -1519,12 +1519,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order) {
return (Collection<T>) get(readSortAlphaAsync(byPattern, getPatterns, order));
return get(readSortAlphaAsync(byPattern, getPatterns, order));
}
@Override
public <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
return (Collection<T>) get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count));
return get(readSortAlphaAsync(byPattern, getPatterns, order, offset, count));
}
@Override
@ -1564,7 +1564,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> sortToAsync(String destName, SortOrder order) {
return sortToAsync(destName, null, Collections.<String>emptyList(), order, -1, -1);
return sortToAsync(destName, null, Collections.emptyList(), order, -1, -1);
}
@Override
@ -1574,7 +1574,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> sortToAsync(String destName, SortOrder order, int offset, int count) {
return sortToAsync(destName, null, Collections.<String>emptyList(), order, offset, count);
return sortToAsync(destName, null, Collections.emptyList(), order, offset, count);
}
@Override
@ -1589,12 +1589,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, SortOrder order) {
return sortToAsync(destName, byPattern, Collections.<String>emptyList(), order, -1, -1);
return sortToAsync(destName, byPattern, Collections.emptyList(), order, -1, -1);
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, SortOrder order, int offset, int count) {
return sortToAsync(destName, byPattern, Collections.<String>emptyList(), order, offset, count);
return sortToAsync(destName, byPattern, Collections.emptyList(), order, offset, count);
}
@Override
@ -1614,7 +1614,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
List<Object> params = new ArrayList<Object>();
List<Object> params = new ArrayList<>();
params.add(getRawName());
if (byPattern != null) {
params.add("BY");
@ -1641,7 +1641,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
}
private <T> RFuture<Collection<T>> readSortAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count, boolean alpha) {
List<Object> params = new ArrayList<Object>();
List<Object> params = new ArrayList<>();
params.add(getRawName());
if (byPattern != null) {
params.add("BY");
@ -1679,7 +1679,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Collection<V>> readDiffAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 2);
List<Object> args = new ArrayList<>(names.length + 2);
args.add(names.length + 1);
args.add(getRawName());
args.addAll(Arrays.asList(names));

@ -741,7 +741,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
"redis.call('sadd', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName()), encode(values).toArray());
Arrays.asList(getRawName()), encode(Arrays.asList(values).toArray()));
}
@Override

@ -265,7 +265,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
List<Object> params = new ArrayList<>();
params.add(System.currentTimeMillis());
params.add(timeoutDate);
params.addAll(encode(values));
params.addAll(encode(Arrays.asList(values)));
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"for i, v in ipairs(ARGV) do " +
@ -363,10 +363,10 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
long score = 92233720368547758L - System.currentTimeMillis();
List<Object> params = new ArrayList<Object>(c.size()*2);
List<Object> params = new ArrayList<>(c.size() * 2);
for (Object object : c) {
params.add(score);
params.add(encode((V) object));
params.add(encode(params, object));
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,

@ -117,7 +117,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
params.add(random);
params.add(encode(entry.getValue()));
params.add(encode(params, entry.getValue()));
}
if (timeToLive > 0) {

Loading…
Cancel
Save