Fixed - CommandAsyncService blocks indefinitely if MapLoader's methods throw exception

pull/1705/head
Nikita 7 years ago
parent 83235853bf
commit c38fecd9a4

@ -54,6 +54,8 @@ import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -70,6 +72,8 @@ import io.netty.util.concurrent.FutureListener;
*/ */
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> { public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private final Logger log = LoggerFactory.getLogger(getClass());
final AtomicInteger writeBehindCurrentThreads = new AtomicInteger(); final AtomicInteger writeBehindCurrentThreads = new AtomicInteger();
final Queue<Runnable> writeBehindTasks; final Queue<Runnable> writeBehindTasks;
final RedissonClient redisson; final RedissonClient redisson;
@ -739,7 +743,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override @Override
public RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism) { public RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism) {
return loadAllAsync(options.getLoader().loadAllKeys(), replaceExistingValues, parallelism, null); Iterable<K> keys;
try {
keys = options.getLoader().loadAllKeys();
} catch (Exception e) {
log.error("Unable to load keys for map " + getName(), e);
return RedissonPromise.newFailedFuture(e);
}
return loadAllAsync(keys, replaceExistingValues, parallelism, null);
} }
@Override @Override
@ -763,6 +774,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
final RPromise<Void> result = new RedissonPromise<Void>(); final RPromise<Void> result = new RedissonPromise<Void>();
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
try {
final Iterator<? extends K> iter = keys.iterator(); final Iterator<? extends K> iter = keys.iterator();
for (int i = 0; i < parallelism; i++) { for (int i = 0; i < parallelism; i++) {
if (!iter.hasNext()) { if (!iter.hasNext()) {
@ -780,6 +792,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkAndLoadValue(result, counter, iter, key, loadedEntires); checkAndLoadValue(result, counter, iter, key, loadedEntires);
} }
} }
} catch (Exception e) {
log.error("Unable to load keys for map " + getName(), e);
return RedissonPromise.newFailedFuture(e);
}
return result; return result;
} }
@ -1288,11 +1304,18 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
final V value = options.getLoader().load(key); final V value;
try {
value = options.getLoader().load(key);
if (value == null) { if (value == null) {
unlock(result, lock, threadId, value); unlock(result, lock, threadId, value);
return; return;
} }
} catch (Exception e) {
log.error("Unable to load value by key " + key + " for map " + getName(), e);
unlock(result, lock, threadId, null);
return;
}
externalPutAsync(key, value).addListener(new FutureListener<V>() { externalPutAsync(key, value).addListener(new FutureListener<V>() {
@Override @Override

@ -1229,6 +1229,19 @@ public abstract class BaseMapTest extends BaseTest {
}; };
} }
@Test
public void testMapLoaderGetWithException() {
Map<String, String> cache = new HashMap<String, String>() {
@Override
public String get(Object key) {
throw new RuntimeException();
};
};
RMap<String, String> map = getLoaderTestMap("test", cache);
assertThat(map.get("1")).isNull();
}
@Test @Test
public void testMapLoaderGet() { public void testMapLoaderGet() {
Map<String, String> cache = new HashMap<String, String>(); Map<String, String> cache = new HashMap<String, String>();

Loading…
Cancel
Save