diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java index 7e64619bc..6dba012f7 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -28,9 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.CharsetUtil; @@ -100,7 +100,11 @@ public class CommandEncoder extends MessageToByteEncoder> { } } - writeArgument(out, encoder.encode(param)); + ByteBuf buf = encoder.encode(param); + writeArgument(out, buf); + if (!(param instanceof ByteBuf)) { + buf.release(); + } i++; } @@ -157,8 +161,7 @@ public class CommandEncoder extends MessageToByteEncoder> { out.writeByte(BYTES_PREFIX); out.writeBytes(convert(arg.readableBytes())); out.writeBytes(CRLF); - out.writeBytes(arg); - arg.release(); + out.writeBytes(arg, arg.readerIndex(), arg.readableBytes()); out.writeBytes(CRLF); } diff --git a/redisson/src/main/java/org/redisson/command/AsyncDetails.java b/redisson/src/main/java/org/redisson/command/AsyncDetails.java index f1aaf3f17..0dd90b35a 100644 --- a/redisson/src/main/java/org/redisson/command/AsyncDetails.java +++ b/redisson/src/main/java/org/redisson/command/AsyncDetails.java @@ -86,7 +86,7 @@ public class AsyncDetails { this.exception = exception; this.timeout = timeout; } - + public ChannelFuture getWriteFuture() { return writeFuture; } @@ -143,6 +143,9 @@ public class AsyncDetails { public int getAttempt() { return attempt; } + public void incAttempt() { + attempt++; + } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index fb18a6f56..101cd6348 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -63,6 +63,7 @@ import org.redisson.misc.RedissonObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -470,10 +471,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt) { if (mainPromise.isCancelled()) { + free(params); return; } if (!connectionManager.getShutdownLatch().acquire()) { + free(params); mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown")); return; } @@ -490,6 +493,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } catch (Exception e) { connectionManager.getShutdownLatch().release(); + free(params); mainPromise.tryFailure(e); return; } @@ -518,8 +522,17 @@ public class CommandAsyncService implements CommandAsyncExecutor { connectionManager.getShutdownLatch().release(); } else { if (details.getConnectionFuture().isSuccess()) { - ChannelFuture writeFuture = details.getWriteFuture(); - if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { + if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { + if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { + return; + } + details.incAttempt(); + Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + details.setTimeout(timeout); + return; + } + + if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) { return; } } @@ -529,6 +542,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (details.getAttemptPromise().cancel(false)) { AsyncDetails.release(details); } + free(details); return; } @@ -537,6 +551,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + LogHelper.toString(details.getParams()))); } details.getAttemptPromise().tryFailure(details.getException()); + free(details); return; } if (!details.getAttemptPromise().cancel(false)) { @@ -551,6 +566,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); AsyncDetails.release(details); } + }; Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); @@ -562,13 +578,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (connFuture.isCancelled()) { return; } - + if (!connFuture.isSuccess()) { connectionManager.getShutdownLatch().release(); details.setException(convertException(connectionFuture)); return; } - + if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) { releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details); return; @@ -611,12 +627,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); } - private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { - ChannelFuture future = details.getWriteFuture(); - if (details.getAttemptPromise().isDone() || future.isCancelled()) { - return; + protected void free(final Object[] params) { + for (Object obj : params) { + if (obj instanceof ByteBuf) { + ((ByteBuf)obj).release(); + } } + } + protected void free(final AsyncDetails details) { + for (Object obj : details.getParams()) { + if (obj instanceof ByteBuf) { + ((ByteBuf)obj).release(); + } + } + } + + private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { + ChannelFuture future = details.getWriteFuture(); if (!future.isSuccess()) { details.setException(new WriteRedisConnectionException( "Can't write command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()) + " to channel: " + future.channel(), future.cause())); @@ -624,7 +652,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { } details.getTimeout().cancel(); - + free(details); + long timeoutTime = connectionManager.getConfig().getTimeout(); if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) { Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); @@ -725,7 +754,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (!connectionFuture.isSuccess()) { return; } - + RedisConnection connection = connectionFuture.getNow(); connectionManager.getShutdownLatch().release(); if (isReadOnly) { @@ -803,6 +832,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } else { details.getMainPromise().tryFailure(future.cause()); } + AsyncDetails.release(details); }