diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index d56e8e9ef..09613f83b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -71,7 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.ThreadLocalRandom; @@ -335,7 +334,11 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private CacheKey toCacheKey(Object key) { ByteBuf encoded = encodeMapKey(key); - return toCacheKey(encoded); + try { + return toCacheKey(encoded); + } finally { + encoded.release(); + } } private CacheKey toCacheKey(ByteBuf encodedKey) { diff --git a/redisson/src/main/java/org/redisson/command/AsyncDetails.java b/redisson/src/main/java/org/redisson/command/AsyncDetails.java index 0dd90b35a..7f5d241d7 100644 --- a/redisson/src/main/java/org/redisson/command/AsyncDetails.java +++ b/redisson/src/main/java/org/redisson/command/AsyncDetails.java @@ -29,6 +29,7 @@ import org.redisson.misc.RPromise; import io.netty.channel.ChannelFuture; import io.netty.util.Timeout; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; public class AsyncDetails { @@ -44,7 +45,7 @@ public class AsyncDetails { Object[] params; RPromise mainPromise; int attempt; - + FutureListener mainPromiseListener; private volatile ChannelFuture writeFuture; @@ -147,6 +148,16 @@ public class AsyncDetails { attempt++; } - + public void setupMainPromiseListener(FutureListener mainPromiseListener) { + this.mainPromiseListener = mainPromiseListener; + mainPromise.addListener(mainPromiseListener); + } + + public void removeMainPromiseListener() { + if (mainPromiseListener != null) { + mainPromise.removeListener(mainPromiseListener); + mainPromiseListener = null; + } + } } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 01e0125e4..4be6943a7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -79,7 +79,7 @@ import io.netty.util.concurrent.FutureListener; */ public class CommandAsyncService implements CommandAsyncExecutor { - private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class); + static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class); final ConnectionManager connectionManager; protected RedissonClient redisson; @@ -510,6 +510,19 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt); + FutureListener mainPromiseListener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isCancelled() && connectionFuture.cancel(false)) { + log.debug("Connection obtaining canceled for {}", command); + details.getTimeout().cancel(); + if (details.getAttemptPromise().cancel(false)) { + free(params); + } + } + } + }; + final TimerTask retryTimerTask = new TimerTask() { @Override @@ -540,9 +553,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (details.getMainPromise().isCancelled()) { if (details.getAttemptPromise().cancel(false)) { + free(details); AsyncDetails.release(details); } - free(details); return; } @@ -551,7 +564,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"))); } details.getAttemptPromise().tryFailure(details.getException()); - free(details); return; } if (!details.getAttemptPromise().cancel(false)) { @@ -563,6 +575,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { log.debug("attempt {} for command {} and params {}", count, details.getCommand(), Arrays.toString(details.getParams())); } + details.removeMainPromiseListener(); async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); AsyncDetails.release(details); } @@ -571,7 +584,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); - + details.setupMainPromiseListener(mainPromiseListener); + connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { @@ -645,7 +659,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { ChannelFuture future = details.getWriteFuture(); + if (details.getAttemptPromise().isDone()) { + return; + } + if (!future.isSuccess()) { + log.trace("Can't write {} to {}", details.getCommand(), connection); details.setException(new WriteRedisConnectionException( "Can't write command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()) + " to channel: " + future.channel(), future.cause())); if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { @@ -656,7 +675,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { } details.getTimeout().cancel(); - free(details); long timeoutTime = connectionManager.getConfig().getTimeout(); if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) { @@ -728,6 +746,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { // handling cancel operation for blocking commands if (future.isCancelled() && !details.getAttemptPromise().isDone()) { + log.debug("Canceled blocking operation {} used {}", details.getCommand(), connection); connection.forceFastReconnectAsync().addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -782,6 +801,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { return; } + details.removeMainPromiseListener(); + if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(), @@ -818,6 +839,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { return; } + free(details); + if (future.isSuccess()) { R res = future.getNow(); if (res instanceof RedisClientResult) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 8623b5bd4..257e41604 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -239,13 +239,15 @@ public class CommandBatchService extends CommandAsyncService { return promise; } - protected void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, + private void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) { if (mainPromise.isCancelled()) { + free(entry); return; } if (!connectionManager.getShutdownLatch().acquire()) { + free(entry); mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown")); return; } @@ -260,10 +262,30 @@ public class CommandBatchService extends CommandAsyncService { } else { connectionFuture = connectionManager.connectionWriteOp(source, null); } + + final int attempts; + if (retryAttempts > 0) { + attempts = retryAttempts; + } else { + attempts = connectionManager.getConfig().getRetryAttempts(); + } + + final FutureListener mainPromiseListener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isCancelled() && connectionFuture.cancel(false)) { + log.debug("Connection obtaining canceled for batch"); + details.getTimeout().cancel(); + if (details.getAttemptPromise().cancel(false)) { + free(entry); + } + } + } + }; final TimerTask retryTimerTask = new TimerTask() { @Override - public void run(Timeout timeout) throws Exception { + public void run(Timeout t) throws Exception { if (attemptPromise.isDone()) { return; } @@ -272,22 +294,29 @@ public class CommandBatchService extends CommandAsyncService { connectionManager.getShutdownLatch().release(); } else { if (connectionFuture.isSuccess()) { - ChannelFuture writeFuture = details.getWriteFuture(); - if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { + if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { + if (details.getAttempt() == attempts) { + return; + } + details.incAttempt(); + Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + details.setTimeout(timeout); + return; + } + + if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) { return; } } } if (mainPromise.isCancelled()) { - attemptPromise.cancel(false); + if (attemptPromise.cancel(false)) { + free(entry); + } return; } - int attempts = connectionManager.getConfig().getRetryAttempts(); - if (retryAttempts > 0) { - attempts = retryAttempts; - } if (attempt == attempts) { if (details.getException() == null) { details.setException(new RedisTimeoutException("Batch command execution timeout")); @@ -300,6 +329,7 @@ public class CommandBatchService extends CommandAsyncService { } int count = attempt + 1; + mainPromise.removeListener(mainPromiseListener); execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval); } }; @@ -311,11 +341,12 @@ public class CommandBatchService extends CommandAsyncService { Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS); details.setTimeout(timeout); + mainPromise.addListener(mainPromiseListener); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout); + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts); } }); @@ -327,6 +358,8 @@ public class CommandBatchService extends CommandAsyncService { return; } + mainPromise.removeListener(mainPromiseListener); + if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); entry.clearErrors(); @@ -357,7 +390,8 @@ public class CommandBatchService extends CommandAsyncService { return; } - + free(entry); + if (future.isSuccess()) { if (slots.decrementAndGet() == 0) { mainPromise.trySuccess(future.getNow()); @@ -369,36 +403,44 @@ public class CommandBatchService extends CommandAsyncService { }); } - private void checkWriteFuture(final RPromise attemptPromise, AsyncDetails details, - final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout) { - if (attemptPromise.isDone() || future.isCancelled()) { - return; + protected void free(final Entry entry) { + for (BatchCommandData command : entry.getCommands()) { + free(command.getParams()); } + } + private void checkWriteFuture(Entry entry, final RPromise attemptPromise, AsyncDetails details, + final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout, int attempts) { if (!future.isSuccess()) { details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); - } else { - details.getTimeout().cancel(); - TimerTask timerTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - attemptPromise.tryFailure( - new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); - } - }; - - long timeout = connectionManager.getConfig().getTimeout(); - if (responseTimeout > 0) { - timeout = responseTimeout; + if (details.getAttempt() == attempts) { + details.getAttemptPromise().tryFailure(details.getException()); + free(entry); + } + return; + } + + details.getTimeout().cancel(); + + TimerTask timerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + attemptPromise.tryFailure( + new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); } - Timeout timeoutTask = connectionManager.newTimeout(timerTask, timeout, TimeUnit.MILLISECONDS); - details.setTimeout(timeoutTask); + }; + + long timeout = connectionManager.getConfig().getTimeout(); + if (responseTimeout > 0) { + timeout = responseTimeout; } + Timeout timeoutTask = connectionManager.newTimeout(timerTask, timeout, TimeUnit.MILLISECONDS); + details.setTimeout(timeoutTask); } private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture, final boolean noResult, final long responseTimeout) { + RFuture connFuture, final boolean noResult, final long responseTimeout, int attempts) { if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { return; } @@ -430,7 +472,7 @@ public class CommandBatchService extends CommandAsyncService { details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(attemptPromise, details, connection, future, noResult, responseTimeout); + checkWriteFuture(entry, attemptPromise, details, connection, future, noResult, responseTimeout, attempts); } }); diff --git a/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java b/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java index 4a01d0ecb..11765fbef 100644 --- a/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java +++ b/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java @@ -80,7 +80,11 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS } private static String bytesToHex(ByteBuf bytes) { - return ByteBufUtil.hexDump(bytes); + try { + return ByteBufUtil.hexDump(bytes); + } finally { + bytes.release(); + } } private static byte[] hexToBytes(String s) {