Fixed - Permanent blocking calling threads #4459

Fixed - don't retry command if retryAttempts = 0 and retryInterval > 0
pull/4497/head
Nikita Koksharov 3 years ago
parent 705f68e868
commit 14d79672a7

@ -18,6 +18,7 @@ package org.redisson.command;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.util.Timeout;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.client.RedisConnection;
@ -114,7 +115,7 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
if (list.isEmpty()) {
writeFuture = connection.getChannel().newPromise();
attemptPromise.complete(null);
timeout.cancel();
timeout.ifPresent(Timeout::cancel);
return;
}

@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@ -75,7 +76,7 @@ public class RedisExecutor<V, R> {
NodeSource source;
Codec codec;
volatile int attempt;
volatile Timeout timeout;
volatile Optional<Timeout> timeout = Optional.empty();
volatile BiConsumer<R, Throwable> mainPromiseListener;
volatile ChannelFuture writeFuture;
volatile RedisException exception;
@ -126,7 +127,7 @@ public class RedisExecutor<V, R> {
mainPromiseListener = (r, e) -> {
if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
log.debug("Connection obtaining canceled for {}", command);
timeout.cancel();
timeout.ifPresent(Timeout::cancel);
if (attemptPromise.cancel(false)) {
free();
}
@ -143,6 +144,8 @@ public class RedisExecutor<V, R> {
scheduleRetryTimeout(connectionFuture, attemptPromise);
scheduleConnectionTimeout(attemptPromise, connectionFuture);
connectionFuture.whenComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
@ -152,16 +155,18 @@ public class RedisExecutor<V, R> {
if (connectionFuture.isDone() && connectionFuture.isCompletedExceptionally()) {
connectionManager.getShutdownLatch().release();
exception = convertException(connectionFuture);
if (attempt == attempts) {
attemptPromise.completeExceptionally(exception);
}
return;
}
sendCommand(attemptPromise, connection);
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(writeFuture, attemptPromise, connection);
}
scheduleWriteTimeout(attemptPromise);
writeFuture.addListener((ChannelFutureListener) future -> {
checkWriteFuture(writeFuture, attemptPromise, connection);
});
});
@ -172,7 +177,55 @@ public class RedisExecutor<V, R> {
});
}
private void scheduleConnectionTimeout(CompletableFuture<R> attemptPromise, CompletableFuture<RedisConnection> connectionFuture) {
if (retryInterval > 0 && attempts > 0) {
return;
}
timeout.ifPresent(Timeout::cancel);
TimerTask task = timeout -> {
if (connectionFuture.cancel(false)) {
exception = new RedisTimeoutException("Unable to acquire connection! " + this.connectionFuture +
"Increase connection pool size. "
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
attemptPromise.completeExceptionally(exception);
}
};
timeout = Optional.of(connectionManager.newTimeout(task, responseTimeout, TimeUnit.MILLISECONDS));
}
private void scheduleWriteTimeout(CompletableFuture<R> attemptPromise) {
if (retryInterval > 0 && attempts > 0) {
return;
}
timeout.ifPresent(Timeout::cancel);
TimerTask task = timeout -> {
if (writeFuture.cancel(false)) {
exception = new RedisTimeoutException("Command still hasn't been written into connection! " +
"Check connection with Redis node: " + connectionFuture.join().getRedisClient().getAddr() +
" for TCP packet drops. Try to increase nettyThreads setting. "
+ " Node source: " + source + ", connection: " + connectionFuture.join()
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
attemptPromise.completeExceptionally(exception);
}
};
timeout = Optional.of(connectionManager.newTimeout(task, responseTimeout, TimeUnit.MILLISECONDS));
}
private void scheduleRetryTimeout(CompletableFuture<RedisConnection> connectionFuture, CompletableFuture<R> attemptPromise) {
if (retryInterval == 0 || attempts == 0) {
return;
}
TimerTask retryTimerTask = new TimerTask() {
@Override
@ -247,7 +300,7 @@ public class RedisExecutor<V, R> {
};
timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS);
timeout = Optional.of(connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS));
}
protected void free() {
@ -276,12 +329,12 @@ public class RedisExecutor<V, R> {
return;
}
timeout.cancel();
scheduleResponseTimeout(attemptPromise, connection);
}
private void scheduleResponseTimeout(CompletableFuture<R> attemptPromise, RedisConnection connection) {
timeout.ifPresent(Timeout::cancel);
long timeoutTime = responseTimeout;
if (command != null && command.isBlockingCommand()) {
Long popTimeout = null;
@ -334,7 +387,7 @@ public class RedisExecutor<V, R> {
+ LogHelper.toString(command, params) + ", channel: " + connection.getChannel()));
};
timeout = connectionManager.newTimeout(timeoutResponseTask, timeoutTime, TimeUnit.MILLISECONDS);
timeout = Optional.of(connectionManager.newTimeout(timeoutResponseTask, timeoutTime, TimeUnit.MILLISECONDS));
}
protected boolean isResendAllowed(int attempt, int attempts) {
@ -404,7 +457,8 @@ public class RedisExecutor<V, R> {
}
protected void checkAttemptPromise(CompletableFuture<R> attemptFuture, CompletableFuture<RedisConnection> connectionFuture) {
timeout.cancel();
timeout.ifPresent(Timeout::cancel);
if (attemptFuture.isCancelled()) {
return;
}

@ -172,11 +172,11 @@ public class RedissonBatchTest extends BaseTest {
RedissonClient redisson = Redisson.create(config);
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger counter = new AtomicInteger(5*150);
AtomicInteger counter = new AtomicInteger(5*15);
AtomicBoolean hasErrors = new AtomicBoolean();
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
for (int j = 0 ; j < 150; j++) {
for (int j = 0 ; j < 15; j++) {
executeBatch(redisson, batchOptions).whenComplete((r, e) -> {
if (e != null) {
hasErrors.set(true);
@ -198,7 +198,7 @@ public class RedissonBatchTest extends BaseTest {
public RFuture<BatchResult<?>> executeBatch(RedissonClient client, BatchOptions batchOptions) {
RBatch batch = client.createBatch(batchOptions);
for (int i = 0; i < 50; i++) {
for (int i = 0; i < 100000; i++) {
String key = "" + i;
batch.getBucket(key).getAsync();
}

Loading…
Cancel
Save