ByteBuf should be released properly. #1018

pull/1025/head
Nikita 8 years ago
parent 820579b541
commit 8fccb51dd1

@ -71,7 +71,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
@ -335,7 +334,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private CacheKey toCacheKey(Object key) {
ByteBuf encoded = encodeMapKey(key);
return toCacheKey(encoded);
try {
return toCacheKey(encoded);
} finally {
encoded.release();
}
}
private CacheKey toCacheKey(ByteBuf encodedKey) {

@ -29,6 +29,7 @@ import org.redisson.misc.RPromise;
import io.netty.channel.ChannelFuture;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class AsyncDetails<V, R> {
@ -44,7 +45,7 @@ public class AsyncDetails<V, R> {
Object[] params;
RPromise<R> mainPromise;
int attempt;
FutureListener<R> mainPromiseListener;
private volatile ChannelFuture writeFuture;
@ -147,6 +148,16 @@ public class AsyncDetails<V, R> {
attempt++;
}
public void setupMainPromiseListener(FutureListener<R> mainPromiseListener) {
this.mainPromiseListener = mainPromiseListener;
mainPromise.addListener(mainPromiseListener);
}
public void removeMainPromiseListener() {
if (mainPromiseListener != null) {
mainPromise.removeListener(mainPromiseListener);
mainPromiseListener = null;
}
}
}

@ -79,7 +79,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class CommandAsyncService implements CommandAsyncExecutor {
private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager;
protected RedissonClient redisson;
@ -510,6 +510,19 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);
FutureListener<R> mainPromiseListener = new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isCancelled() && connectionFuture.cancel(false)) {
log.debug("Connection obtaining canceled for {}", command);
details.getTimeout().cancel();
if (details.getAttemptPromise().cancel(false)) {
free(params);
}
}
}
};
final TimerTask retryTimerTask = new TimerTask() {
@Override
@ -540,9 +553,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getMainPromise().isCancelled()) {
if (details.getAttemptPromise().cancel(false)) {
free(details);
AsyncDetails.release(details);
}
free(details);
return;
}
@ -551,7 +564,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
}
details.getAttemptPromise().tryFailure(details.getException());
free(details);
return;
}
if (!details.getAttemptPromise().cancel(false)) {
@ -563,6 +575,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
log.debug("attempt {} for command {} and params {}",
count, details.getCommand(), Arrays.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
@ -571,7 +584,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
details.setupMainPromiseListener(mainPromiseListener);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
@ -645,7 +659,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (details.getAttemptPromise().isDone()) {
return;
}
if (!future.isSuccess()) {
log.trace("Can't write {} to {}", details.getCommand(), connection);
details.setException(new WriteRedisConnectionException(
"Can't write command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()) + " to channel: " + future.channel(), future.cause()));
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
@ -656,7 +675,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
details.getTimeout().cancel();
free(details);
long timeoutTime = connectionManager.getConfig().getTimeout();
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
@ -728,6 +746,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
// handling cancel operation for blocking commands
if (future.isCancelled() && !details.getAttemptPromise().isDone()) {
log.debug("Canceled blocking operation {} used {}", details.getCommand(), connection);
connection.forceFastReconnectAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
@ -782,6 +801,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
details.removeMainPromiseListener();
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(),
@ -818,6 +839,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
free(details);
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {

@ -239,13 +239,15 @@ public class CommandBatchService extends CommandAsyncService {
return promise;
}
protected void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) {
if (mainPromise.isCancelled()) {
free(entry);
return;
}
if (!connectionManager.getShutdownLatch().acquire()) {
free(entry);
mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
return;
}
@ -260,10 +262,30 @@ public class CommandBatchService extends CommandAsyncService {
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
final int attempts;
if (retryAttempts > 0) {
attempts = retryAttempts;
} else {
attempts = connectionManager.getConfig().getRetryAttempts();
}
final FutureListener<Void> mainPromiseListener = new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isCancelled() && connectionFuture.cancel(false)) {
log.debug("Connection obtaining canceled for batch");
details.getTimeout().cancel();
if (details.getAttemptPromise().cancel(false)) {
free(entry);
}
}
}
};
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
public void run(Timeout t) throws Exception {
if (attemptPromise.isDone()) {
return;
}
@ -272,22 +294,29 @@ public class CommandBatchService extends CommandAsyncService {
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
ChannelFuture writeFuture = details.getWriteFuture();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
if (details.getAttempt() == attempts) {
return;
}
details.incAttempt();
Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
return;
}
if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {
return;
}
}
}
if (mainPromise.isCancelled()) {
attemptPromise.cancel(false);
if (attemptPromise.cancel(false)) {
free(entry);
}
return;
}
int attempts = connectionManager.getConfig().getRetryAttempts();
if (retryAttempts > 0) {
attempts = retryAttempts;
}
if (attempt == attempts) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Batch command execution timeout"));
@ -300,6 +329,7 @@ public class CommandBatchService extends CommandAsyncService {
}
int count = attempt + 1;
mainPromise.removeListener(mainPromiseListener);
execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval);
}
};
@ -311,11 +341,12 @@ public class CommandBatchService extends CommandAsyncService {
Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
mainPromise.addListener(mainPromiseListener);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout);
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts);
}
});
@ -327,6 +358,8 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
mainPromise.removeListener(mainPromiseListener);
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
@ -357,7 +390,8 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
free(entry);
if (future.isSuccess()) {
if (slots.decrementAndGet() == 0) {
mainPromise.trySuccess(future.getNow());
@ -369,36 +403,44 @@ public class CommandBatchService extends CommandAsyncService {
});
}
private void checkWriteFuture(final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout) {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
protected void free(final Entry entry) {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
free(command.getParams());
}
}
private void checkWriteFuture(Entry entry, final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout, int attempts) {
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
} else {
details.getTimeout().cancel();
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
}
};
long timeout = connectionManager.getConfig().getTimeout();
if (responseTimeout > 0) {
timeout = responseTimeout;
if (details.getAttempt() == attempts) {
details.getAttemptPromise().tryFailure(details.getException());
free(entry);
}
return;
}
details.getTimeout().cancel();
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
}
Timeout timeoutTask = connectionManager.newTimeout(timerTask, timeout, TimeUnit.MILLISECONDS);
details.setTimeout(timeoutTask);
};
long timeout = connectionManager.getConfig().getTimeout();
if (responseTimeout > 0) {
timeout = responseTimeout;
}
Timeout timeoutTask = connectionManager.newTimeout(timerTask, timeout, TimeUnit.MILLISECONDS);
details.setTimeout(timeoutTask);
}
private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout) {
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, int attempts) {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return;
}
@ -430,7 +472,7 @@ public class CommandBatchService extends CommandAsyncService {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(attemptPromise, details, connection, future, noResult, responseTimeout);
checkWriteFuture(entry, attemptPromise, details, connection, future, noResult, responseTimeout, attempts);
}
});

@ -80,7 +80,11 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS
}
private static String bytesToHex(ByteBuf bytes) {
return ByteBufUtil.hexDump(bytes);
try {
return ByteBufUtil.hexDump(bytes);
} finally {
bytes.release();
}
}
private static byte[] hexToBytes(String s) {

Loading…
Cancel
Save