Fixed - Spring Data Redis connection in multi mode may cause thread hang #2927

pull/970/merge
Nikita Koksharov 5 years ago
parent 981ce2e4b1
commit 5706e92744

@ -15,11 +15,14 @@
*/
package org.redisson.command;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.BatchCommandData;
import org.redisson.client.protocol.CommandData;
@ -38,8 +41,10 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
*
@ -172,7 +177,11 @@ public class CommandBatchService extends CommandAsyncService {
});
return promise;
}
public boolean isExecuted() {
return executed.get();
}
public RFuture<BatchResult<?>> executeAsync() {
if (executed.get()) {
throw new IllegalStateException("Batch already executed!");
@ -300,9 +309,35 @@ public class CommandBatchService extends CommandAsyncService {
}
RPromise<R> resultPromise = new RedissonPromise<R>();
Timeout timeout;
if (semaphore.getCounter() < permits) {
long responseTimeout;
if (options.getResponseTimeout() > 0) {
responseTimeout = options.getResponseTimeout();
} else {
responseTimeout = connectionManager.getConfig().getTimeout();
}
timeout = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
resultPromise.tryFailure(new RedisTimeoutException("Response timeout for queued commands " + responseTimeout + ": " +
commands.values().stream()
.flatMap(e -> e.getCommands().stream().map(d -> d.getCommand()))
.collect(Collectors.toList())));
}
}, responseTimeout, TimeUnit.MILLISECONDS);
} else {
timeout = null;
}
semaphore.acquire(new Runnable() {
@Override
public void run() {
if (timeout != null) {
timeout.cancel();
}
for (Entry entry : commands.values()) {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
if (command.getPromise().isDone() && !command.getPromise().isSuccess()) {

@ -103,30 +103,36 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
}
return;
}
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleSuccess(sentPromise, connectionFuture, null);
semaphore.release();
try {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleSuccess(sentPromise, connectionFuture, null);
} finally {
semaphore.release();
}
}
@Override
protected void handleError(RFuture<RedisConnection> connectionFuture, Throwable cause) {
if (mainPromise instanceof BatchPromise) {
BatchPromise<R> batchPromise = (BatchPromise<R>) mainPromise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
sentPromise.tryFailure(cause);
mainPromise.tryFailure(cause);
if (executed.compareAndSet(false, true)) {
connectionFuture.getNow().forceFastReconnectAsync().onComplete((res, e) -> {
RedisQueuedBatchExecutor.super.releaseConnection(mainPromise, connectionFuture);
});
try {
if (mainPromise instanceof BatchPromise) {
BatchPromise<R> batchPromise = (BatchPromise<R>) mainPromise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
sentPromise.tryFailure(cause);
mainPromise.tryFailure(cause);
if (executed.compareAndSet(false, true)) {
connectionFuture.getNow().forceFastReconnectAsync().onComplete((res, e) -> {
RedisQueuedBatchExecutor.super.releaseConnection(mainPromise, connectionFuture);
});
}
return;
}
super.handleError(connectionFuture, cause);
} finally {
semaphore.release();
return;
}
super.handleError(connectionFuture, cause);
}
@Override

Loading…
Cancel
Save