Fixed - ExecutionException handling.

pull/5711/head
Nikita Koksharov 1 year ago
parent 8a0bb45ac9
commit d36259bdb8

@ -1001,7 +1001,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[0]));
try {
future.toCompletableFuture().get(timeout, unit);
} catch (ExecutionException | TimeoutException | CancellationException e) {
} catch (ExecutionException e) {
LOGGER.error(e.getMessage(), e);
} catch (TimeoutException | CancellationException e) {
// skip
}
List<?> futures = future.getTaskFutures();

@ -24,6 +24,8 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.LockPubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
@ -44,6 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RedissonLock extends RedissonBaseLock {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonLock.class);
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
@ -257,6 +261,7 @@ public class RedissonLock extends RedissonBaseLock {
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
LOGGER.error(e.getMessage(), e);
acquireFailed(waitTime, unit, threadId);
return false;
}

@ -26,6 +26,8 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
@ -39,6 +41,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RedissonPermitExpirableSemaphore extends RedissonExpirable implements RPermitExpirableSemaphore {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonPermitExpirableSemaphore.class);
private final String channelName;
private final SemaphorePubSub semaphorePubSub;
@ -464,7 +468,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
RedissonLockEntry entry;
try {
entry = future.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
} catch (ExecutionException e) {
LOGGER.error(e.getMessage(), e);
return Collections.emptyList();
} catch (TimeoutException e) {
return Collections.emptyList();
}

@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
private static final Logger log = LoggerFactory.getLogger(RSemaphore.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonSemaphore.class);
private final SemaphorePubSub semaphorePubSub;
@ -283,19 +283,19 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
log.debug("trying to acquire, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
LOGGER.debug("trying to acquire, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
if (tryAcquire(permits)) {
log.debug("acquired, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
LOGGER.debug("acquired, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
LOGGER.debug("unable to acquire, permits: {}, name: {}", permits, getName());
return false;
}
@ -304,40 +304,43 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
RedissonLockEntry entry;
try {
entry = future.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | CancellationException | TimeoutException e) {
log.debug("unable to subscribe for permits acquisition, permits: {}, name: {}", permits, getName());
} catch (ExecutionException e) {
LOGGER.error(e.getMessage(), e);
return false;
} catch (TimeoutException | CancellationException e) {
LOGGER.debug("unable to subscribe for permits acquisition, permits: {}, name: {}", permits, getName());
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
LOGGER.debug("unable to acquire, permits: {}, name: {}", permits, getName());
return false;
}
while (true) {
current = System.currentTimeMillis();
if (tryAcquire(permits)) {
log.debug("acquired, permits: {}, wait-time: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
LOGGER.debug("acquired, permits: {}, wait-time: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
LOGGER.debug("unable to acquire, permits: {}, name: {}", permits, getName());
return false;
}
// waiting for message
current = System.currentTimeMillis();
log.debug("wait for acquisition, permits: {}, wait-time(ms): {}, name: {}", permits, time, getName());
LOGGER.debug("wait for acquisition, permits: {}, wait-time(ms): {}, name: {}", permits, time, getName());
entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
time -= System.currentTimeMillis() - current;
if (time <= 0) {
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
LOGGER.debug("unable to acquire, permits: {}, name: {}", permits, getName());
return false;
}
}
@ -439,9 +442,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call(ARGV[2], KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits, getSubscribeService().getPublishCommand());
if (log.isDebugEnabled()) {
if (LOGGER.isDebugEnabled()) {
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
LOGGER.debug("released, permits: {}, name: {}", permits, getName());
});
}
return future;
@ -492,12 +495,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Arrays.asList(getRawName(), getChannelName()),
permits, getSubscribeService().getPublishCommand());
if (log.isDebugEnabled()) {
if (LOGGER.isDebugEnabled()) {
future.thenAccept(r -> {
if (r) {
log.debug("permits set, permits: {}, name: {}", permits, getName());
LOGGER.debug("permits set, permits: {}, name: {}", permits, getName());
} else {
log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
LOGGER.debug("unable to set permits, permits: {}, name: {}", permits, getName());
}
});
}

@ -23,6 +23,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import java.io.Serializable;
@ -104,7 +105,9 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
if (timeout > 0) {
try {
mapperFuture.toCompletableFuture().get(timeout - timeSpent, TimeUnit.MILLISECONDS);
} catch (ExecutionException | CancellationException | TimeoutException e) {
} catch (ExecutionException e) {
throw ((Redisson) redisson).getCommandExecutor().convertException(e);
} catch (TimeoutException e) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
@ -112,7 +115,9 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
if (timeout == 0) {
try {
mapperFuture.toCompletableFuture().join();
} catch (CompletionException | CancellationException e) {
} catch (CompletionException e) {
throw new RedisException(e.getCause());
} catch (CancellationException e) {
// skip
}
}

Loading…
Cancel
Save