|
|
|
@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
import org.redisson.ElementsSubscribeService;
|
|
|
|
@ -128,8 +127,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
protected MasterSlaveServersConfig config;
|
|
|
|
|
|
|
|
|
|
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
|
|
|
|
|
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
|
|
|
|
|
private MasterSlaveEntry masterSlaveEntry;
|
|
|
|
|
|
|
|
|
|
private final Promise<Void> shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise();
|
|
|
|
|
|
|
|
|
@ -330,7 +328,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Collection<MasterSlaveEntry> getEntrySet() {
|
|
|
|
|
return client2entry.values();
|
|
|
|
|
if (masterSlaveEntry != null) {
|
|
|
|
|
return Collections.singletonList(masterSlaveEntry);
|
|
|
|
|
}
|
|
|
|
|
return Collections.emptyList();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void initTimer(MasterSlaveServersConfig config) {
|
|
|
|
@ -369,10 +370,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
|
|
|
|
|
addEntry(slot, entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
masterSlaveEntry = entry;
|
|
|
|
|
|
|
|
|
|
startDNSMonitoring(masterFuture.getNow());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
stopThreads();
|
|
|
|
@ -502,40 +501,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public MasterSlaveEntry getEntry(InetSocketAddress address) {
|
|
|
|
|
for (MasterSlaveEntry entry : client2entry.values()) {
|
|
|
|
|
InetSocketAddress addr = entry.getClient().getAddr();
|
|
|
|
|
if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) {
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
return masterSlaveEntry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected MasterSlaveEntry getEntry(RedisURI addr) {
|
|
|
|
|
for (MasterSlaveEntry entry : client2entry.values()) {
|
|
|
|
|
if (RedisURI.compare(entry.getClient().getAddr(), addr)) {
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
if (entry.hasSlave(addr)) {
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
return masterSlaveEntry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public MasterSlaveEntry getEntry(RedisClient redisClient) {
|
|
|
|
|
MasterSlaveEntry entry = client2entry.get(redisClient);
|
|
|
|
|
if (entry != null) {
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (MasterSlaveEntry mentry : client2entry.values()) {
|
|
|
|
|
if (mentry.hasSlave(redisClient)) {
|
|
|
|
|
return mentry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
return masterSlaveEntry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -546,39 +521,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public MasterSlaveEntry getEntry(int slot) {
|
|
|
|
|
return slot2entry.get(slot);
|
|
|
|
|
return masterSlaveEntry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected void removeClient(RedisClient client) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void addClient(MasterSlaveEntry entry) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected final RFuture<RedisClient> changeMaster(int slot, RedisURI address) {
|
|
|
|
|
final MasterSlaveEntry entry = getEntry(slot);
|
|
|
|
|
final RedisClient oldClient = entry.getClient();
|
|
|
|
|
RFuture<RedisClient> future = entry.changeMaster(address);
|
|
|
|
|
future.onComplete((res, e) -> {
|
|
|
|
|
if (e == null) {
|
|
|
|
|
client2entry.remove(oldClient);
|
|
|
|
|
client2entry.put(entry.getClient(), entry);
|
|
|
|
|
removeClient(oldClient);
|
|
|
|
|
addClient(entry);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return future;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
|
|
|
|
|
MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);
|
|
|
|
|
if (oldEntry != entry) {
|
|
|
|
|
entry.incReference();
|
|
|
|
|
shutdownEntry(oldEntry);
|
|
|
|
|
}
|
|
|
|
|
client2entry.put(entry.getClient(), entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected final void removeEntry(Integer slot) {
|
|
|
|
|
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
|
|
|
|
|
shutdownEntry(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void shutdownEntry(MasterSlaveEntry entry) {
|
|
|
|
|
protected final void shutdownEntry(MasterSlaveEntry entry) {
|
|
|
|
|
if (entry != null && entry.decReference() == 0) {
|
|
|
|
|
client2entry.remove(entry.getClient());
|
|
|
|
|
removeClient(entry.getClient());
|
|
|
|
|
entry.getAllEntries().forEach(e -> entry.nodeDown(e));
|
|
|
|
|
entry.masterDown();
|
|
|
|
|
entry.shutdownAsync();
|
|
|
|
|