ByteBuf should be released once it successfully written to channel. #1018

pull/1025/head
Nikita 8 years ago
parent 75eea5f8bd
commit da3c5df4b9

@ -28,9 +28,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
@ -100,7 +100,11 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
}
}
writeArgument(out, encoder.encode(param));
ByteBuf buf = encoder.encode(param);
writeArgument(out, buf);
if (!(param instanceof ByteBuf)) {
buf.release();
}
i++;
}
@ -157,8 +161,7 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
out.writeByte(BYTES_PREFIX);
out.writeBytes(convert(arg.readableBytes()));
out.writeBytes(CRLF);
out.writeBytes(arg);
arg.release();
out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
out.writeBytes(CRLF);
}

@ -86,7 +86,7 @@ public class AsyncDetails<V, R> {
this.exception = exception;
this.timeout = timeout;
}
public ChannelFuture getWriteFuture() {
return writeFuture;
}
@ -143,6 +143,9 @@ public class AsyncDetails<V, R> {
public int getAttempt() {
return attempt;
}
public void incAttempt() {
attempt++;
}

@ -63,6 +63,7 @@ import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -470,10 +471,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {
if (mainPromise.isCancelled()) {
free(params);
return;
}
if (!connectionManager.getShutdownLatch().acquire()) {
free(params);
mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
return;
}
@ -490,6 +493,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
} catch (Exception e) {
connectionManager.getShutdownLatch().release();
free(params);
mainPromise.tryFailure(e);
return;
}
@ -518,8 +522,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
connectionManager.getShutdownLatch().release();
} else {
if (details.getConnectionFuture().isSuccess()) {
ChannelFuture writeFuture = details.getWriteFuture();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
return;
}
details.incAttempt();
Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
return;
}
if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {
return;
}
}
@ -529,6 +542,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getAttemptPromise().cancel(false)) {
AsyncDetails.release(details);
}
free(details);
return;
}
@ -537,6 +551,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + LogHelper.toString(details.getParams())));
}
details.getAttemptPromise().tryFailure(details.getException());
free(details);
return;
}
if (!details.getAttemptPromise().cancel(false)) {
@ -551,6 +566,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
};
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
@ -562,13 +578,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (connFuture.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
@ -611,12 +627,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (details.getAttemptPromise().isDone() || future.isCancelled()) {
return;
protected void free(final Object[] params) {
for (Object obj : params) {
if (obj instanceof ByteBuf) {
((ByteBuf)obj).release();
}
}
}
protected <V, R> void free(final AsyncDetails<V, R> details) {
for (Object obj : details.getParams()) {
if (obj instanceof ByteBuf) {
((ByteBuf)obj).release();
}
}
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException(
"Can't write command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()) + " to channel: " + future.channel(), future.cause()));
@ -624,7 +652,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
details.getTimeout().cancel();
free(details);
long timeoutTime = connectionManager.getConfig().getTimeout();
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
@ -725,7 +754,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!connectionFuture.isSuccess()) {
return;
}
RedisConnection connection = connectionFuture.getNow();
connectionManager.getShutdownLatch().release();
if (isReadOnly) {
@ -803,6 +832,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
details.getMainPromise().tryFailure(future.cause());
}
AsyncDetails.release(details);
}

Loading…
Cancel
Save