Fixed - RLocalCachedMap.putAll gets stuck. #1245

pull/1249/head
Nikita 7 years ago
parent 67cebd0e0f
commit 7ead223c93

@ -68,6 +68,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -76,7 +77,6 @@ import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -239,6 +239,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't check existance", future.cause());
return;
}
@ -953,7 +954,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
final RPromise<Map<K, V>> promise = newPromise();
final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = super.getAllAsync(mapKeys);
future.addListener(new FutureListener<Map<K, V>>() {
@Override
@ -984,19 +985,16 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
protected RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*3);
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
int payloadSize = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
ByteBuf mapKey = encodeMapKey(t.getKey());
payloadSize += mapKey.readableBytes();
ByteBuf mapValue = encodeMapValue(t.getValue());
payloadSize += mapValue.readableBytes();
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
@ -1004,7 +1002,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
i++;
}
ByteBuf msgEncoded;
ByteBuf msgEncoded = null;
if (syncStrategy == SyncStrategy.UPDATE) {
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
for (int j = 2; j < params.size(); j += 2) {
@ -1024,23 +1022,25 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] entryId = generateLogEntryId(hash);
params.add(time);
params.add(entryId);
payloadSize += entryId.length + 8;
}
}
params.add(msgEncoded);
payloadSize += msgEncoded.readableBytes();
log.debug("Payload size passed to putAll method: {}", payloadSize);
if (msgEncoded != null) {
params.add(msgEncoded);
}
final RPromise<Void> result = newPromise();
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
"for i=3, tonumber(ARGV[2]) + 2, 5000 do "
+ "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); "
+ "end; "
+ "if ARGV[1] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;"
+ "if ARGV[1] == '2' then "
+ "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));"
+ "for i=tonumber(ARGV[2]) + 2 + 1, #ARGV - 1, 5000 do "
+ "redis.call('hmset', KEYS[3], unpack(ARGV, i, math.min(i+4999, #ARGV - 1))); "
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
@ -1050,6 +1050,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
@ -1125,7 +1126,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.add((V) value.getValue());
}
final RPromise<Collection<V>> promise = newPromise();
final RPromise<Collection<V>> promise = new RedissonPromise<Collection<V>>();
RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
@ -1172,13 +1173,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.put((K)value.getKey(), (V)value.getValue());
}
final RPromise<Map<K, V>> promise = newPromise();
final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result);
future.addListener(new FutureListener<Map<K, V>>() {
@Override
public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
@ -1215,13 +1217,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue()));
}
final RPromise<Set<Entry<K, V>>> promise = newPromise();
final RPromise<Set<Entry<K, V>>> promise = new RedissonPromise<Set<Entry<K, V>>>();
RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result);
future.addListener(new FutureListener<Set<Entry<K, V>>>() {
@Override
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

@ -126,24 +126,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
return redisson.getLocalCachedMap("test", options);
}
// @Test
public void testBigData() throws InterruptedException {
@Test
public void testBigPutAll() throws InterruptedException {
RLocalCachedMap<Object, Object> m = redisson.getLocalCachedMap("testValuesWithNearCache2",
LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU));
LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).syncStrategy(SyncStrategy.INVALIDATE));
for (int i = 0; i < 100; i++) {
for (int k = 0; k < 1000; k++) {
Map<Object, Object> map = new HashMap<>();
map.put("" + k * i, "" + k * i);
m.putAll(map);
}
System.out.println(i);
Map<Object, Object> map = new HashMap<>();
for (int k = 0; k < 10000; k++) {
map.put("" + k, "" + k);
}
m.putAll(map);
System.out.println("done");
Thread.sleep(1000000);
assertThat(m.size()).isEqualTo(10000);
}

Loading…
Cancel
Save