From bf84090f63dde95b51914ed954191b46703a4424 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 14 Jun 2018 13:30:55 +0300 Subject: [PATCH] Added test for connection leak after error --- .../redisson/command/CommandAsyncService.java | 8 ++--- .../redisson/command/CommandBatchService.java | 26 +++++++------- .../java/org/redisson/RedissonBatchTest.java | 34 +++++++++++++++++-- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index ed5a6623a..72859930d 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -857,18 +857,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { handleSuccess(details.getMainPromise(), details.getCommand(), res); } else { - handleError(details.getMainPromise(), future.cause()); + handleError(details, details.getMainPromise(), future.cause()); } AsyncDetails.release(details); } catch (RuntimeException e) { - handleError(details.getMainPromise(), e); + handleError(details, details.getMainPromise(), e); throw e; } } - protected void handleError(RPromise promise, Throwable cause) { - promise.tryFailure(cause); + protected void handleError(AsyncDetails details, RPromise mainPromise, Throwable cause) { + mainPromise.tryFailure(cause); } protected void handleSuccess(RPromise promise, RedisCommand command, R res) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 1c4961eb2..09a2d4655 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -224,17 +224,25 @@ public class CommandBatchService extends CommandAsyncService { } @Override - protected void handleError(RPromise promise, Throwable cause) { + protected void handleError(final AsyncDetails details, RPromise promise, Throwable cause) { if (isRedisBasedQueue() && promise instanceof BatchPromise) { BatchPromise batchPromise = (BatchPromise) promise; RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); - super.handleError(sentPromise, cause); - super.handleError(promise, cause); + sentPromise.tryFailure(cause); + promise.tryFailure(cause); + if (executed.compareAndSet(false, true)) { + details.getConnectionFuture().getNow().forceFastReconnectAsync().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + CommandBatchService.super.releaseConnection(details.getSource(), details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details); + } + }); + } semaphore.release(); return; } - super.handleError(promise, cause); + super.handleError(details, promise, cause); } @Override @@ -279,18 +287,12 @@ public class CommandBatchService extends CommandAsyncService { List> list = new LinkedList>(); if (options.isSkipResult()) { -// BatchCommandData offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); -// entry.getCommands().addFirst(offCommand); -// list.add(offCommand); list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" })); } list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); if (options.isSkipResult()) { -// BatchCommandData onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); -// entry.getCommands().add(onCommand); -// list.add(onCommand); list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "ON" })); } if (options.getSyncSlaves() > 0) { @@ -339,9 +341,9 @@ public class CommandBatchService extends CommandAsyncService { RFuture connectionFuture; if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { - connectionFuture = connectionManager.connectionWriteOp(source, command); + connectionFuture = connectionManager.connectionWriteOp(source, null); } else { - connectionFuture = connectionManager.connectionReadOp(source, command); + connectionFuture = connectionManager.connectionReadOp(source, null); } connectionFuture.syncUninterruptibly(); entry.setConnectionFuture(connectionFuture); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 58cbf99b3..de96b8898 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -3,7 +3,6 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -29,7 +28,6 @@ import org.redisson.api.RBatch; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; import org.redisson.api.RMapAsync; -import org.redisson.api.RMapCache; import org.redisson.api.RMapCacheAsync; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; @@ -81,6 +79,38 @@ public class RedissonBatchTest extends BaseTest { System.out.println(t); } + @Test + public void testConnectionLeakAfterError() throws InterruptedException { + Config config = createConfig(); + config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); + + RedissonClient redisson = Redisson.create(config); + + BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); + RBatch batch = redisson.createBatch(batchOptions); + for (int i = 0; i < 200000; i++) { + batch.getBucket("test").setAsync(123); + } + + try { + batch.execute(); + Assert.fail(); + } catch (Exception e) { + // skip + } + + redisson.getBucket("test3").set(4); + assertThat(redisson.getBucket("test3").get()).isEqualTo(4); + + batch = redisson.createBatch(batchOptions); + batch.getBucket("test1").setAsync(1); + batch.getBucket("test2").setAsync(2); + batch.execute(); + + assertThat(redisson.getBucket("test1").get()).isEqualTo(1); + assertThat(redisson.getBucket("test2").get()).isEqualTo(2); + } + @Test public void testBigRequestAtomic() { batchOptions