refactoring

pull/1614/head
Nikita 7 years ago
parent cb004ab8c6
commit 0252bbb573

@ -62,7 +62,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolver;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -234,11 +233,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveEntry e; final MasterSlaveEntry e;
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
if (config.checkSkipSlavesInit()) { if (config.checkSkipSlavesInit()) {
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); e = new SingleEntry(ClusterConnectionManager.this, config);
} else { } else {
config.setSlaveAddresses(partition.getSlaveAddresses()); config.setSlaveAddresses(partition.getSlaveAddresses());
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); e = new MasterSlaveEntry(ClusterConnectionManager.this, config);
List<RFuture<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses()); List<RFuture<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
futures.addAll(fs); futures.addAll(fs);
@ -596,7 +595,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer slot : removedSlots) { for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeEntry(slot); MasterSlaveEntry entry = removeEntry(slot);
if (entry.getSlotRanges().isEmpty()) { if (entry.getReferences() == 0) {
entry.shutdownAsync(); entry.shutdownAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr()); log.info("{} master and slaves for it removed", entry.getClient().getAddr());
} }
@ -633,9 +632,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) { private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition currentPartition : getLastPartitions()) { for (ClusterPartition currentPartition : getLastPartitions()) {
for (ClusterPartition newPartition : newPartitions) { for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId()) if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) {
// skip master change case
|| !currentPartition.getMasterAddress().equals(newPartition.getMasterAddress())) {
continue; continue;
} }

@ -15,13 +15,11 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -323,14 +321,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void initSingleEntry() { protected void initSingleEntry() {
try { try {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry; MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) { if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(slots, this, config); entry = new SingleEntry(this, config);
} else { } else {
entry = createMasterSlaveEntry(config, slots); entry = createMasterSlaveEntry(config);
} }
RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress()); RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly(); f.syncUninterruptibly();
@ -354,9 +349,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
} }
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots) { MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet()); List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
for (RFuture<Void> future : fs) { for (RFuture<Void> future : fs) {
future.syncUninterruptibly(); future.syncUninterruptibly();
@ -403,13 +397,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public RedisClient createClient(NodeType type, URI address, String sslHostname) { public RedisClient createClient(NodeType type, URI address, String sslHostname) {
RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getTimeout(), sslHostname);
return client; return client;
} }
@Override @Override
public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) { public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) {
RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname); RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getTimeout(), sslHostname);
return client; return client;
} }
@ -522,17 +516,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
}); });
} }
protected final void addEntry(Integer slot, MasterSlaveEntry entry) { protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
slot2entry.set(slot, entry); MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);
entry.addSlotRange(slot); if (oldEntry != entry) {
entry.incReference();
}
client2entry.put(entry.getClient(), entry); client2entry.put(entry.getClient(), entry);
} }
protected final MasterSlaveEntry removeEntry(Integer slot) { protected final MasterSlaveEntry removeEntry(Integer slot) {
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null); MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
entry.removeSlotRange(slot); if (entry.decReference() == 0) {
if (entry.getSlotRanges().isEmpty()) {
client2entry.remove(entry.getClient()); client2entry.remove(entry.getClient());
} }
return entry; return entry;

@ -18,10 +18,8 @@ package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
@ -32,7 +30,6 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
@ -66,11 +63,12 @@ public class MasterSlaveEntry {
LoadBalancerManager slaveBalancer; LoadBalancerManager slaveBalancer;
ClientConnectionsEntry masterEntry; ClientConnectionsEntry masterEntry;
int references;
final MasterSlaveServersConfig config; final MasterSlaveServersConfig config;
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
final MasterConnectionPool writeConnectionPool; final MasterConnectionPool writeConnectionPool;
final Set<Integer> slots = new HashSet<Integer>();
final MasterPubSubConnectionPool pubSubConnectionPool; final MasterPubSubConnectionPool pubSubConnectionPool;
@ -78,12 +76,7 @@ public class MasterSlaveEntry {
String sslHostname; String sslHostname;
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public MasterSlaveEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
for (ClusterSlotRange clusterSlotRange : slotRanges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i);
}
}
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.config = config; this.config = config;
@ -524,17 +517,17 @@ public class MasterSlaveEntry {
} }
slaveBalancer.returnConnection(connection); slaveBalancer.returnConnection(connection);
} }
public void addSlotRange(Integer range) { public void incReference() {
slots.add(range); references++;
} }
public void removeSlotRange(Integer range) { public int decReference() {
slots.remove(range); return --references;
} }
public Set<Integer> getSlotRanges() { public int getReferences() {
return slots; return references;
} }
@Override @Override

@ -420,9 +420,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
@Override @Override
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots) { MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(disconnectedSlaves); List<RFuture<Void>> fs = entry.initSlaveBalancer(disconnectedSlaves);
for (RFuture<Void> future : fs) { for (RFuture<Void> future : fs) {
future.syncUninterruptibly(); future.syncUninterruptibly();

@ -16,12 +16,10 @@
package org.redisson.connection; package org.redisson.connection;
import java.net.URI; import java.net.URI;
import java.util.Set;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
/** /**
@ -31,8 +29,8 @@ import org.redisson.config.MasterSlaveServersConfig;
*/ */
public class SingleEntry extends MasterSlaveEntry { public class SingleEntry extends MasterSlaveEntry {
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public SingleEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config); super(connectionManager, config);
} }
@Override @Override

Loading…
Cancel
Save