Added test for connection leak after error

pull/1499/head
Nikita 7 years ago
parent 1133f4a1dc
commit bf84090f63

@ -857,18 +857,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
handleSuccess(details.getMainPromise(), details.getCommand(), res); handleSuccess(details.getMainPromise(), details.getCommand(), res);
} else { } else {
handleError(details.getMainPromise(), future.cause()); handleError(details, details.getMainPromise(), future.cause());
} }
AsyncDetails.release(details); AsyncDetails.release(details);
} catch (RuntimeException e) { } catch (RuntimeException e) {
handleError(details.getMainPromise(), e); handleError(details, details.getMainPromise(), e);
throw e; throw e;
} }
} }
protected <R> void handleError(RPromise<R> promise, Throwable cause) { protected <V, R> void handleError(AsyncDetails<V, R> details, RPromise<R> mainPromise, Throwable cause) {
promise.tryFailure(cause); mainPromise.tryFailure(cause);
} }
protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) { protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) {

@ -224,17 +224,25 @@ public class CommandBatchService extends CommandAsyncService {
} }
@Override @Override
protected <R> void handleError(RPromise<R> promise, Throwable cause) { protected <V, R> void handleError(final AsyncDetails<V, R> details, RPromise<R> promise, Throwable cause) {
if (isRedisBasedQueue() && promise instanceof BatchPromise) { if (isRedisBasedQueue() && promise instanceof BatchPromise) {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise; BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise(); RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleError(sentPromise, cause); sentPromise.tryFailure(cause);
super.handleError(promise, cause); promise.tryFailure(cause);
if (executed.compareAndSet(false, true)) {
details.getConnectionFuture().getNow().forceFastReconnectAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
CommandBatchService.super.releaseConnection(details.getSource(), details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
}
semaphore.release(); semaphore.release();
return; return;
} }
super.handleError(promise, cause); super.handleError(details, promise, cause);
} }
@Override @Override
@ -279,18 +287,12 @@ public class CommandBatchService extends CommandAsyncService {
List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>(); List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>();
if (options.isSkipResult()) { if (options.isSkipResult()) {
// BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
// entry.getCommands().addFirst(offCommand);
// list.add(offCommand);
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" })); list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" }));
} }
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
if (options.isSkipResult()) { if (options.isSkipResult()) {
// BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
// entry.getCommands().add(onCommand);
// list.add(onCommand);
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "ON" })); list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "ON" }));
} }
if (options.getSyncSlaves() > 0) { if (options.getSyncSlaves() > 0) {
@ -339,9 +341,9 @@ public class CommandBatchService extends CommandAsyncService {
RFuture<RedisConnection> connectionFuture; RFuture<RedisConnection> connectionFuture;
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
connectionFuture = connectionManager.connectionWriteOp(source, command); connectionFuture = connectionManager.connectionWriteOp(source, null);
} else { } else {
connectionFuture = connectionManager.connectionReadOp(source, command); connectionFuture = connectionManager.connectionReadOp(source, null);
} }
connectionFuture.syncUninterruptibly(); connectionFuture.syncUninterruptibly();
entry.setConnectionFuture(connectionFuture); entry.setConnectionFuture(connectionFuture);

@ -3,7 +3,6 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -29,7 +28,6 @@ import org.redisson.api.RBatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RListAsync; import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync; import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheAsync; import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript; import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode; import org.redisson.api.RScript.Mode;
@ -81,6 +79,38 @@ public class RedissonBatchTest extends BaseTest {
System.out.println(t); System.out.println(t);
} }
@Test
public void testConnectionLeakAfterError() throws InterruptedException {
Config config = createConfig();
config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonClient redisson = Redisson.create(config);
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 200000; i++) {
batch.getBucket("test").setAsync(123);
}
try {
batch.execute();
Assert.fail();
} catch (Exception e) {
// skip
}
redisson.getBucket("test3").set(4);
assertThat(redisson.getBucket("test3").get()).isEqualTo(4);
batch = redisson.createBatch(batchOptions);
batch.getBucket("test1").setAsync(1);
batch.getBucket("test2").setAsync(2);
batch.execute();
assertThat(redisson.getBucket("test1").get()).isEqualTo(1);
assertThat(redisson.getBucket("test2").get()).isEqualTo(2);
}
@Test @Test
public void testBigRequestAtomic() { public void testBigRequestAtomic() {
batchOptions batchOptions

Loading…
Cancel
Save