Timeout cancellation. #123

pull/126/head
Nikita 10 years ago
parent 2cfb173e59
commit f83e4b8bd3

@ -102,19 +102,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.codec = new RedisCodecWrapper(cfg.getCodec()); this.codec = new RedisCodecWrapper(cfg.getCodec());
} }
public <T> FutureListener<T> createReleaseWriteListener(final int slot, final RedisConnection conn) { public <T> FutureListener<T> createReleaseWriteListener(final int slot,
final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
timeout.cancel();
releaseWrite(slot, conn); releaseWrite(slot, conn);
} }
}; };
} }
public <T> FutureListener<T> createReleaseReadListener(final int slot, final RedisConnection conn) { public <T> FutureListener<T> createReleaseReadListener(final int slot,
final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
timeout.cancel();
releaseRead(slot, conn); releaseRead(slot, conn);
} }
}; };
@ -156,8 +160,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
asyncOperation.execute(promise, async); asyncOperation.execute(promise, async);
ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
promise.addListener(createReleaseWriteListener(slot, connection)); promise.addListener(createReleaseWriteListener(slot, connection, timeout));
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
ex.set(e); ex.set(e);
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
@ -228,8 +232,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
asyncOperation.execute(promise, async); asyncOperation.execute(promise, async);
ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
promise.addListener(createReleaseWriteListener(slot, connection)); promise.addListener(createReleaseWriteListener(slot, connection, timeout));
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
ex.set(e); ex.set(e);
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);
@ -421,8 +425,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
asyncOperation.execute(promise, async); asyncOperation.execute(promise, async);
ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS); Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);
promise.addListener(createReleaseReadListener(slot, connection)); promise.addListener(createReleaseReadListener(slot, connection, timeout));
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
ex.set(e); ex.set(e);
timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS); timer.newTimeout(timerTask, config.getRetryInterval(), TimeUnit.MILLISECONDS);

Loading…
Cancel
Save