diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 60f476881..71016decc 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -102,19 +102,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.codec = new RedisCodecWrapper(cfg.getCodec()); } - public FutureListener createReleaseWriteListener(final int slot, final RedisConnection conn) { + public FutureListener createReleaseWriteListener(final int slot, + final RedisConnection conn, final Timeout timeout) { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + timeout.cancel(); releaseWrite(slot, conn); } }; } - public FutureListener createReleaseReadListener(final int slot, final RedisConnection conn) { + public FutureListener createReleaseReadListener(final int slot, + final RedisConnection conn, final Timeout timeout) { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + timeout.cancel(); releaseRead(slot, conn); } }; @@ -156,8 +160,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { asyncOperation.execute(promise, async); ex.set(new RedisTimeoutException()); - timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); - promise.addListener(createReleaseWriteListener(slot, connection)); + Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); + promise.addListener(createReleaseWriteListener(slot, connection, timeout)); } catch (RedisConnectionException e) { ex.set(e); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); @@ -228,8 +232,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { asyncOperation.execute(promise, async); ex.set(new RedisTimeoutException()); - timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); - promise.addListener(createReleaseWriteListener(slot, connection)); + Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); + promise.addListener(createReleaseWriteListener(slot, connection, timeout)); } catch (RedisConnectionException e) { ex.set(e); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); @@ -421,8 +425,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { asyncOperation.execute(promise, async); ex.set(new RedisTimeoutException()); - timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); - promise.addListener(createReleaseReadListener(slot, connection)); + Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); + promise.addListener(createReleaseReadListener(slot, connection, timeout)); } catch (RedisConnectionException e) { ex.set(e); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);