From fac418e9f675262c9c01157a50d18a9ce6b1d0b6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 21 Dec 2015 15:41:18 +0300 Subject: [PATCH] Memory allocation optimization. #338 --- .../org/redisson/command/AsyncDetails.java | 105 +++++- .../redisson/command/CommandAsyncService.java | 322 +++++++++++------- .../redisson/command/CommandBatchService.java | 6 +- .../connection/ClientConnectionsEntry.java | 5 + .../connection/ConnectionManager.java | 3 +- .../org/redisson/connection/NodeSource.java | 2 + 6 files changed, 311 insertions(+), 132 deletions(-) diff --git a/src/main/java/org/redisson/command/AsyncDetails.java b/src/main/java/org/redisson/command/AsyncDetails.java index 6113797f2..79d4b1d0d 100644 --- a/src/main/java/org/redisson/command/AsyncDetails.java +++ b/src/main/java/org/redisson/command/AsyncDetails.java @@ -15,12 +15,37 @@ */ package org.redisson.command; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.redisson.connection.ConnectionManager; +import org.redisson.connection.NodeSource; import io.netty.channel.ChannelFuture; import io.netty.util.Timeout; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +public class AsyncDetails { + + static final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + + Future connectionFuture; + ConnectionManager connectionManager; + Promise attemptPromise; + boolean readOnlyMode; + NodeSource source; + MultiDecoder messageDecoder; + Codec codec; + RedisCommand command; + Object[] params; + Promise mainPromise; + int attempt; -public class AsyncDetails { private volatile ChannelFuture writeFuture; @@ -28,6 +53,42 @@ public class AsyncDetails { private volatile Timeout timeout; + public AsyncDetails() { + } + + public static AsyncDetails acquire() { +// AsyncDetails result = queue.poll(); +// if (result != null) { +// return result; +// } + + AsyncDetails details = new AsyncDetails(); + return details; + } + + public static void release(AsyncDetails details) { +// queue.add(details); + } + + public void init(Future connectionFuture, + Promise attemptPromise, boolean readOnlyMode, NodeSource source, + Codec codec, RedisCommand command, Object[] params, + Promise mainPromise, int attempt) { + this.connectionFuture = connectionFuture; + this.attemptPromise = attemptPromise; + this.readOnlyMode = readOnlyMode; + this.source = source; + this.messageDecoder = messageDecoder; + this.codec = codec; + this.command = command; + this.params = params; + this.mainPromise = mainPromise; + this.attempt = attempt; + this.writeFuture = writeFuture; + this.exception = exception; + this.timeout = timeout; + } + public ChannelFuture getWriteFuture() { return writeFuture; } @@ -49,4 +110,46 @@ public class AsyncDetails { this.timeout = timeout; } + public Future getConnectionFuture() { + return connectionFuture; + } + + public Promise getAttemptPromise() { + return attemptPromise; + } + + public boolean isReadOnlyMode() { + return readOnlyMode; + } + + public NodeSource getSource() { + return source; + } + + public MultiDecoder getMessageDecoder() { + return messageDecoder; + } + + public Codec getCodec() { + return codec; + } + + public RedisCommand getCommand() { + return command; + } + + public Object[] getParams() { + return params; + } + + public Promise getMainPromise() { + return mainPromise; + } + + public int getAttempt() { + return attempt; + } + + + } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index c57d17cb8..66e4db2fc 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -38,7 +38,6 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; @@ -88,7 +87,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); return mainPromise; } @@ -126,7 +125,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); + async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } @@ -164,7 +163,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); ClusterSlotRange slot = slots.remove(0); - async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0); + async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, attemptPromise, 0); } @Override @@ -203,29 +202,38 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }; for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } - protected RedisException convertException(Future future) { + public RedisException convertException(Future future) { return future.cause() instanceof RedisException ? (RedisException) future.cause() : new RedisException("Unexpected exception while processing command", future.cause()); } + private NodeSource getNodeSource(String key) { + int slot = connectionManager.calcSlot(key); + NodeSource source = NodeSource.ZERO; + if (slot != 0) { + source = new NodeSource(slot); + } + return source; + } + @Override public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0); + NodeSource source = getNodeSource(key); + async(true, source, codec, command, params, mainPromise, 0); return mainPromise; } @Override public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); + async(false, new NodeSource(slot), codec, command, params, mainPromise, 0); return mainPromise; } @@ -236,8 +244,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public Future evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { - int slot = connectionManager.calcSlot(key); - return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); + NodeSource source = getNodeSource(key); + return evalAsync(source, true, codec, evalCommandType, script, keys, params); } @Override @@ -248,8 +256,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public Future evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { - int slot = connectionManager.calcSlot(key); - return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params); + NodeSource source = getNodeSource(key); + return evalAsync(source, false, codec, evalCommandType, script, keys, params); } @Override @@ -284,7 +292,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, 0); + async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, args.toArray(), promise, 0); } return mainPromise; } @@ -296,7 +304,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0); return mainPromise; } @@ -308,12 +316,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - int slot = connectionManager.calcSlot(key); - async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); + NodeSource source = getNodeSource(key); + async(false, source, codec, command, params, mainPromise, 0); return mainPromise; } - protected void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder messageDecoder, final Codec codec, + protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final Promise mainPromise, final int attempt) { if (mainPromise.isCancelled()) { return; @@ -326,8 +334,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { final Promise attemptPromise = connectionManager.newPromise(); - final AsyncDetails details = new AsyncDetails(); - final Future connectionFuture; if (readOnlyMode) { connectionFuture = connectionManager.connectionReadOp(source, command); @@ -335,17 +341,22 @@ public class CommandAsyncService implements CommandAsyncExecutor { connectionFuture = connectionManager.connectionWriteOp(source, command); } + final AsyncDetails details = AsyncDetails.acquire(); + details.init(connectionFuture, attemptPromise, + readOnlyMode, source, codec, command, params, mainPromise, attempt); + final TimerTask retryTimerTask = new TimerTask() { + @Override public void run(Timeout t) throws Exception { - if (attemptPromise.isDone()) { + if (details.getAttemptPromise().isDone()) { return; } - if (connectionFuture.cancel(false)) { + if (details.getConnectionFuture().cancel(false)) { connectionManager.getShutdownLatch().release(); } else { - if (connectionFuture.isSuccess()) { + if (details.getConnectionFuture().isSuccess()) { ChannelFuture writeFuture = details.getWriteFuture(); if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { return; @@ -353,146 +364,203 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - if (mainPromise.isCancelled()) { - attemptPromise.cancel(false); + if (details.getMainPromise().isCancelled()) { + if (details.getAttemptPromise().cancel(false)) { + AsyncDetails.release(details); + } return; } - if (attempt == connectionManager.getConfig().getRetryAttempts()) { + if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { if (details.getException() == null) { - details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params))); + details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(details.getParams()))); } - attemptPromise.tryFailure(details.getException()); + details.getAttemptPromise().tryFailure(details.getException()); return; } - if (!attemptPromise.cancel(false)) { + if (!details.getAttemptPromise().cancel(false)) { return; } - int count = attempt + 1; - async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, count); + int count = details.getAttempt() + 1; + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); + AsyncDetails.release(details); } }; Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); - connectionFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { - return; - } - - if (!connFuture.isSuccess()) { - details.setException(convertException(connFuture)); - return; + if (connectionFuture.isDone()) { + checkConnectionFuture(source, details); + } else { + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + checkConnectionFuture(source, details); } + }); + } - final RedisConnection connection = connFuture.getNow(); + if (attemptPromise.isDone()) { + checkAttemptFuture(source, details, attemptPromise); + } else { + attemptPromise.addListener(new FutureListener() { - if (source.getRedirect() == Redirect.ASK) { - List> list = new ArrayList>(2); - Promise promise = connectionManager.newPromise(); - list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[] {})); - list.add(new CommandData(attemptPromise, messageDecoder, codec, command, params)); - Promise main = connectionManager.newPromise(); - ChannelFuture future = connection.send(new CommandsData(main, list)); - details.setWriteFuture(future); - } else { - log.debug("aquired connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr()); - ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); - details.setWriteFuture(future); + @Override + public void operationComplete(Future future) throws Exception { + checkAttemptFuture(source, details, future); } + }); + } - details.getWriteFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled()) { - return; - } - - if (!future.isSuccess()) { - details.setException(new WriteRedisConnectionException( - "Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause())); - return; - } + } - details.getTimeout().cancel(); + private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { + ChannelFuture future = details.getWriteFuture(); + if (details.getAttemptPromise().isDone() || future.isCancelled()) { + return; + } - int timeoutTime = connectionManager.getConfig().getTimeout(); - if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName()) - || command.getName().equals(RedisCommands.BRPOP_VALUE.getName())) { - Integer popTimeout = Integer.valueOf(params[params.length - 1].toString()); - if (popTimeout == 0) { - return; - } - timeoutTime += popTimeout*1000; - } + if (!future.isSuccess()) { + details.setException(new WriteRedisConnectionException( + "Can't write command: " + details.getCommand() + ", params: " + Arrays.toString(details.getParams()) + " to channel: " + future.channel(), future.cause())); + return; + } - final int timeoutAmount = timeoutTime; - TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - attemptPromise.tryFailure( - new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + command - + " with params: " + Arrays.toString(params) + " channel: " + connection.getChannel())); - } - }; - - Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS); - details.setTimeout(timeout); - } - }); + details.getTimeout().cancel(); - if (readOnlyMode) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection)); - } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection)); - } + int timeoutTime = connectionManager.getConfig().getTimeout(); + if (details.getCommand().getName().equals(RedisCommands.BLPOP_VALUE.getName()) + || details.getCommand().getName().equals(RedisCommands.BRPOP_VALUE.getName())) { + Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); + if (popTimeout == 0) { + return; } - }); + timeoutTime += popTimeout*1000; + } - attemptPromise.addListener(new FutureListener() { + final int timeoutAmount = timeoutTime; + TimerTask timeoutTask = new TimerTask() { @Override - public void operationComplete(Future future) throws Exception { - details.getTimeout().cancel(); - if (future.isCancelled()) { - return; - } + public void run(Timeout timeout) throws Exception { + details.getAttemptPromise().tryFailure( + new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand() + + " with params: " + Arrays.toString(details.getParams()) + " channel: " + connection.getChannel())); + } + }; - if (future.cause() instanceof RedisMovedException) { - RedisMovedException ex = (RedisMovedException)future.cause(); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); - return; - } + Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS); + details.setTimeout(timeout); + } - if (future.cause() instanceof RedisAskException) { - RedisAskException ex = (RedisAskException)future.cause(); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); - return; + private void checkConnectionFuture(final NodeSource source, + final AsyncDetails details) { + if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) { + return; + } + + if (!details.getConnectionFuture().isSuccess()) { + details.setException(convertException(details.getConnectionFuture())); + return; + } + + final RedisConnection connection = details.getConnectionFuture().getNow(); + + if (details.getSource().getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(2); + Promise promise = connectionManager.newPromise(); + list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[] {})); + list.add(new CommandData(details.getAttemptPromise(), details.getMessageDecoder(), details.getCodec(), details.getCommand(), details.getParams())); + Promise main = connectionManager.newPromise(); + ChannelFuture future = connection.send(new CommandsData(main, list)); + details.setWriteFuture(future); + } else { + log.debug("aquired connection for command {} from slot {} using node {}", details.getCommand(), details.getSource(), connection.getRedisClient().getAddr()); + ChannelFuture future = connection.send(new CommandData(details.getAttemptPromise(), details.getMessageDecoder(), details.getCodec(), details.getCommand(), details.getParams())); + details.setWriteFuture(future); + } + + if (details.getWriteFuture().isDone()) { + checkWriteFuture(details, connection); + } else { + details.getWriteFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + checkWriteFuture(details, connection); } + }); + } - if (future.cause() instanceof RedisLoadingException) { - async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt); - return; + if (details.getAttemptPromise().isDone()) { + releaseConnection(source, details, connection); + } else { + details.getAttemptPromise().addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + releaseConnection(source, details, connection); } + }); + } + } - if (future.isSuccess()) { - R res = future.getNow(); - if (res instanceof RedisClientResult) { - InetSocketAddress addr = source.getAddr(); - if (addr == null) { - addr = connectionFuture.getNow().getRedisClient().getAddr(); - } - ((RedisClientResult)res).setRedisClient(addr); - } - mainPromise.setSuccess(res); - } else { - mainPromise.setFailure(future.cause()); + private void releaseConnection(NodeSource source, AsyncDetails details, RedisConnection connection) { + if (details.getAttemptPromise().isCancelled()) { + return; + } + + connectionManager.getShutdownLatch().release(); + if (details.isReadOnlyMode()) { + connectionManager.releaseRead(source, connection); + } else { + connectionManager.releaseWrite(source, connection); + } + } + + private void checkAttemptFuture(final NodeSource source, final AsyncDetails details, + Future future) { + details.getTimeout().cancel(); + if (future.isCancelled()) { + return; + } + + if (future.cause() instanceof RedisMovedException) { + RedisMovedException ex = (RedisMovedException)future.cause(); + async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(), + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + AsyncDetails.release(details); + return; + } + + if (future.cause() instanceof RedisAskException) { + RedisAskException ex = (RedisAskException)future.cause(); + async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(), + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + AsyncDetails.release(details); + return; + } + + if (future.cause() instanceof RedisLoadingException) { + async(details.isReadOnlyMode(), source, details.getCodec(), + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + AsyncDetails.release(details); + return; + } + + if (future.isSuccess()) { + R res = future.getNow(); + if (res instanceof RedisClientResult) { + InetSocketAddress addr = source.getAddr(); + if (addr == null) { + addr = details.getConnectionFuture().getNow().getRedisClient().getAddr(); } + ((RedisClientResult)res).setRedisClient(addr); } - }); + details.getMainPromise().setSuccess(res); + } else { + details.getMainPromise().setFailure(future.cause()); + } + AsyncDetails.release(details); } } diff --git a/src/main/java/org/redisson/command/CommandBatchService.java b/src/main/java/org/redisson/command/CommandBatchService.java index 93e4a7dce..c3a3589a5 100644 --- a/src/main/java/org/redisson/command/CommandBatchService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -104,7 +104,7 @@ public class CommandBatchService extends CommandReactiveService { } @Override - protected void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder messageDecoder, + protected void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand command, Object[] params, Promise mainPromise, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); @@ -121,7 +121,7 @@ public class CommandBatchService extends CommandReactiveService { if (!readOnlyMode) { entry.setReadOnlyMode(false); } - entry.getCommands().add(new CommandEntry(new CommandData(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet())); + entry.getCommands().add(new CommandEntry(new CommandData(mainPromise, codec, command, params), index.incrementAndGet())); } public List execute() { @@ -222,7 +222,7 @@ public class CommandBatchService extends CommandReactiveService { final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (attemptPromise.isDone() || mainPromise.isDone()) { + if (attemptPromise.isDone()) { return; } diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 31ef28d2b..25453c6fa 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -172,6 +172,11 @@ public class ClientConnectionsEntry { } private void addFireEventListener(Promise connectionFuture) { + if (connectionFuture.isSuccess()) { + connectionManager.getConnectionEventsHub().fireConnect(connectionFuture.getNow().getRedisClient().getAddr()); + return; + } + connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 760221dd2..d52ab2142 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -27,7 +27,6 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.command.AsyncDetails; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; @@ -85,8 +84,10 @@ public interface ConnectionManager { Future connectionWriteOp(NodeSource source, RedisCommand command); + @Deprecated FutureListener createReleaseReadListener(NodeSource source, RedisConnection conn); + @Deprecated FutureListener createReleaseWriteListener(NodeSource source, RedisConnection conn); RedisClient createClient(String host, int port, int timeout); diff --git a/src/main/java/org/redisson/connection/NodeSource.java b/src/main/java/org/redisson/connection/NodeSource.java index ee65405f9..920ef8068 100644 --- a/src/main/java/org/redisson/connection/NodeSource.java +++ b/src/main/java/org/redisson/connection/NodeSource.java @@ -19,6 +19,8 @@ import java.net.InetSocketAddress; public class NodeSource { + public static final NodeSource ZERO = new NodeSource(0); + public enum Redirect {MOVED, ASK} private final Integer slot;