refactoring

pull/4879/head
Nikita Koksharov 2 years ago
parent 8080b66518
commit 5d00206cf4

@ -15,20 +15,18 @@
*/
package org.redisson;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*

@ -24,7 +24,6 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.*;
import org.redisson.executor.params.*;
import org.redisson.misc.CompletableFutureWrapper;

@ -177,7 +177,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return client2entry.values();
}
protected MasterSlaveEntry getEntry(RedisURI addr) {
@Override
public MasterSlaveEntry getEntry(RedisURI addr) {
for (MasterSlaveEntry entry : client2entry.values()) {
if (addr.equals(entry.getClient().getAddr())) {
return entry;
@ -229,7 +230,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public MasterSlaveEntry getEntry(int slot) {
protected MasterSlaveEntry getEntry(int slot) {
return slot2entry.get(slot);
}

@ -74,34 +74,28 @@ public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
}
}
protected final MasterSlaveEntry getEntry(NodeSource source) {
protected final MasterSlaveEntry getEntry() {
if (source.getSlot() != null) {
MasterSlaveEntry entry = connectionManager.getEntry(source.getSlot());
entry = connectionManager.getWriteEntry(source.getSlot());
if (entry == null) {
throw connectionManager.getServiceManager().createNodeNotFoundException(source);
}
return entry;
}
return source.getEntry();
entry = source.getEntry();
return entry;
}
protected final void addBatchCommandData(Object[] batchParams) {
MasterSlaveEntry msEntry = getEntry(source);
Entry entry = commands.get(msEntry);
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(msEntry, entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
MasterSlaveEntry msEntry = getEntry();
Entry entry = commands.computeIfAbsent(msEntry, k -> new Entry());
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
Codec codecToUse = getCodec(codec);
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());
BatchCommandData<V, R> commandData = new BatchCommandData<>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());
entry.getCommands().add(commandData);
}

@ -562,7 +562,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Map<MasterSlaveEntry, Map<Integer, List<String>>> entry2keys = Arrays.stream(keys).collect(
Collectors.groupingBy(k -> {
int slot = connectionManager.calcSlot(k);
return connectionManager.getEntry(slot);
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
return connectionManager.calcSlot(k);
}, Collectors.toList())));

@ -32,6 +32,7 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.liveobject.core.RedissonObjectBuilder;
@ -74,6 +75,7 @@ public class RedisExecutor<V, R> {
CompletableFuture<RedisConnection> connectionFuture;
NodeSource source;
MasterSlaveEntry entry;
Codec codec;
volatile int attempt;
volatile Optional<Timeout> timeout = Optional.empty();
@ -121,7 +123,7 @@ public class RedisExecutor<V, R> {
codec = getCodec(codec);
CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
CompletableFuture<RedisConnection> connectionFuture = getConnection();
CompletableFuture<R> attemptPromise = new CompletableFuture<>();
mainPromiseListener = (r, e) -> {
@ -642,9 +644,9 @@ public class RedisExecutor<V, R> {
private void release(RedisConnection connection) {
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
entry.releaseRead(connection);
} else {
connectionManager.releaseWrite(source, connection);
entry.releaseWrite(connection);
}
}
@ -654,9 +656,9 @@ public class RedisExecutor<V, R> {
protected CompletableFuture<RedisConnection> getConnection() {
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
connectionFuture = connectionReadOp(command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
connectionFuture = connectionWriteOp(command);
}
return connectionFuture;
}
@ -715,5 +717,57 @@ public class RedisExecutor<V, R> {
return new RedisException("Unexpected exception while processing command", cause);
}
final CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
entry = getEntry(true);
if (entry == null) {
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(connectionManager.getServiceManager().createNodeNotFoundException(source));
return f;
}
if (source.getRedirect() != null) {
return entry.connectionReadOp(command, source.getAddr());
}
if (source.getRedisClient() != null) {
return entry.connectionReadOp(command, source.getRedisClient());
}
return entry.connectionReadOp(command);
}
final CompletableFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
entry = getEntry(false);
if (entry == null) {
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(connectionManager.getServiceManager().createNodeNotFoundException(source));
return f;
}
// fix for https://github.com/redisson/redisson/issues/1548
if (source.getRedirect() != null
&& !source.getAddr().equals(entry.getClient().getAddr())
&& entry.hasSlave(source.getAddr())) {
return entry.redirectedConnectionWriteOp(command, source.getAddr());
}
return entry.connectionWriteOp(command);
}
private MasterSlaveEntry getEntry(boolean read) {
if (source.getRedirect() != null) {
return connectionManager.getEntry(source.getAddr());
}
MasterSlaveEntry entry = source.getEntry();
if (source.getRedisClient() != null) {
entry = connectionManager.getEntry(source.getRedisClient());
}
if (entry == null && source.getSlot() != null) {
if (read) {
entry = connectionManager.getReadEntry(source.getSlot());
} else {
entry = connectionManager.getWriteEntry(source.getSlot());
}
}
return entry;
}
}

@ -104,7 +104,7 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
protected void handleError(CompletableFuture<RedisConnection> connectionFuture, Throwable cause) {
if (mainPromise instanceof BatchPromise) {
BatchPromise<R> batchPromise = (BatchPromise<R>) mainPromise;
CompletableFuture sentPromise = batchPromise.getSentPromise();
CompletableFuture<?> sentPromise = batchPromise.getSentPromise();
sentPromise.completeExceptionally(cause);
mainPromise.completeExceptionally(cause);
if (executed.compareAndSet(false, true)) {
@ -120,7 +120,7 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
@Override
protected void sendCommand(CompletableFuture<R> attemptPromise, RedisConnection connection) {
MasterSlaveEntry msEntry = getEntry(source);
MasterSlaveEntry msEntry = getEntry();
ConnectionEntry connectionEntry = connections.get(msEntry);
boolean syncSlaves = options.getSyncSlaves() > 0;
@ -165,14 +165,14 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
list.add(new CommandData<>(new CompletableFuture<>(), codec, RedisCommands.CLIENT_REPLY, new Object[]{"ON"}));
}
if (options.getSyncSlaves() > 0) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
BatchCommandData<?, ?> waitCommand = new BatchCommandData<>(RedisCommands.WAIT,
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
list.add(waitCommand);
entry.getCommands().add(waitCommand);
}
CompletableFuture<Void> main = new CompletableFuture<>();
writeFuture = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands()),
writeFuture = connection.send(new CommandsData(main, list, new ArrayList<>(entry.getCommands()),
options.isSkipResult(), false, true, syncSlaves));
} else {
CompletableFuture<Void> main = new CompletableFuture<>();
@ -186,16 +186,8 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
@Override
protected CompletableFuture<RedisConnection> getConnection() {
MasterSlaveEntry msEntry = getEntry(source);
ConnectionEntry entry = connections.get(msEntry);
if (entry == null) {
entry = new ConnectionEntry();
ConnectionEntry oldEntry = connections.putIfAbsent(msEntry, entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
MasterSlaveEntry msEntry = getEntry();
ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> new ConnectionEntry());
if (entry.getConnectionFuture() != null) {
connectionFuture = entry.getConnectionFuture();
@ -209,9 +201,9 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
}
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
connectionFuture = connectionManager.connectionWriteOp(source, null);
connectionFuture = connectionWriteOp(null);
} else {
connectionFuture = connectionManager.connectionReadOp(source, null);
connectionFuture = connectionReadOp(null);
}
connectionFuture.toCompletableFuture().join();
entry.setConnectionFuture(connectionFuture);

@ -263,6 +263,7 @@ public class Config {
*
* @return ConnectionManager
*/
@Deprecated
ConnectionManager getConnectionManager() {
return connectionManager;
}
@ -274,6 +275,7 @@ public class Config {
* manager.
* @param connectionManager for supply
*/
@Deprecated
public void useCustomServers(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@ -182,8 +182,6 @@ public class ConfigSupport {
public static ConnectionManager createConnectionManager(Config configCopy) {
ServiceManager serviceManager = new ServiceManager(configCopy);
UUID id = UUID.randomUUID();
ConnectionManager cm = null;
if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig());

@ -17,14 +17,11 @@ package org.redisson.connection;
import org.redisson.api.NodeType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.misc.RedisURI;
import org.redisson.pubsub.PublishSubscribeService;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
@ -50,17 +47,13 @@ public interface ConnectionManager {
MasterSlaveEntry getEntry(String name);
MasterSlaveEntry getEntry(int slot);
MasterSlaveEntry getWriteEntry(int slot);
MasterSlaveEntry getEntry(InetSocketAddress address);
void releaseRead(NodeSource source, RedisConnection connection);
MasterSlaveEntry getReadEntry(int slot);
void releaseWrite(NodeSource source, RedisConnection connection);
CompletableFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command);
MasterSlaveEntry getEntry(InetSocketAddress address);
CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
MasterSlaveEntry getEntry(RedisURI addr);
RedisClient createClient(NodeType type, InetSocketAddress address, RedisURI uri, String sslHostname);

@ -17,7 +17,6 @@ package org.redisson.connection;
import org.redisson.api.NodeType;
import org.redisson.client.*;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.BaseConfig;
import org.redisson.config.BaseMasterSlaveServersConfig;
@ -320,7 +319,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return masterSlaveEntry;
}
protected MasterSlaveEntry getEntry(RedisURI addr) {
@Override
public MasterSlaveEntry getEntry(RedisURI addr) {
return masterSlaveEntry;
}
@ -335,86 +335,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return getEntry(slot);
}
@Override
public MasterSlaveEntry getEntry(int slot) {
protected MasterSlaveEntry getEntry(int slot) {
return masterSlaveEntry;
}
protected CompletableFuture<RedisClient> changeMaster(int slot, RedisURI address) {
MasterSlaveEntry entry = getEntry(slot);
return entry.changeMaster(address);
}
@Override
public CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(serviceManager.createNodeNotFoundException(source));
return f;
}
// fix for https://github.com/redisson/redisson/issues/1548
if (source.getRedirect() != null
&& !source.getAddr().equals(entry.getClient().getAddr())
&& entry.hasSlave(source.getAddr())) {
return entry.redirectedConnectionWriteOp(command, source.getAddr());
}
return entry.connectionWriteOp(command);
}
private MasterSlaveEntry getEntry(NodeSource source) {
if (source.getRedirect() != null) {
return getEntry(source.getAddr());
}
MasterSlaveEntry entry = source.getEntry();
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
}
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
}
return entry;
}
@Override
public CompletableFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(serviceManager.createNodeNotFoundException(source));
return f;
}
if (source.getRedirect() != null) {
return entry.connectionReadOp(command, source.getAddr());
}
if (source.getRedisClient() != null) {
return entry.connectionReadOp(command, source.getRedisClient());
}
return entry.connectionReadOp(command);
public MasterSlaveEntry getWriteEntry(int slot) {
return getEntry(slot);
}
@Override
public void releaseWrite(NodeSource source, RedisConnection connection) {
MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
log.error("Node: {} can't be found", source);
} else {
entry.releaseWrite(connection);
}
public MasterSlaveEntry getReadEntry(int slot) {
return getEntry(slot);
}
@Override
public void releaseRead(NodeSource source, RedisConnection connection) {
MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
log.error("Node: {} can't be found", source);
} else {
entry.releaseRead(connection);
}
protected CompletableFuture<RedisClient> changeMaster(int slot, RedisURI address) {
MasterSlaveEntry entry = getEntry(slot);
return entry.changeMaster(address);
}
@Override

@ -372,7 +372,7 @@ public class PublishSubscribeService {
private MasterSlaveEntry getEntry(ChannelName channelName) {
int slot = connectionManager.calcSlot(channelName.getName());
return connectionManager.getEntry(slot);
return connectionManager.getWriteEntry(slot);
}
private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,

Loading…
Cancel
Save