Fixed - entry shutdown during cluster slots migration check #5553

pull/5929/head
Nikita Koksharov 8 months ago
parent a489ffd73f
commit be794b0c60

@ -265,6 +265,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
shutdownEntry(entry);
}
private void removeEntry(Integer slot, MasterSlaveEntry entry) {
if (slot2entry.compareAndSet(slot, entry, null)) {
shutdownEntry(entry);
}
}
private void shutdownEntry(MasterSlaveEntry entry) {
if (entry != null && entry.decReference() == 0) {
entry.getAllEntries().forEach(e -> {
@ -337,6 +343,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer slot : partition.getSlots()) {
addEntry(slot, entry);
lastPartitions.put(slot, partition);
partition.incReference();
}
if (partition.getSlotsAmount() > 0) {
lastUri2Partition.put(partition.getMasterAddress(), partition);
@ -691,18 +698,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
.filter(s -> newPartitions.stream().noneMatch(p -> p.hasSlot(s)))
.collect(Collectors.toSet());
for (Integer removedSlot : removedSlots) {
ClusterPartition p = lastPartitions.remove(removedSlot);
if (p != null) {
for (Integer slot : removedSlots) {
ClusterPartition p = lastPartitions.remove(slot);
if (p != null && p.decReference() == 0) {
lastUri2Partition.remove(p.getMasterAddress());
}
removeEntry(slot);
}
if (!removedSlots.isEmpty()) {
log.info("{} slots found to remove", removedSlots.size());
}
for (Integer slot : removedSlots) {
removeEntry(slot);
log.info("{} slots removed", removedSlots.size());
}
Integer addedSlots = 0;
@ -716,13 +720,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (entry != null) {
addEntry(slot, entry);
lastPartitions.put(slot, clusterPartition);
clusterPartition.incReference();
lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition);
addedSlots++;
}
}
}
if (addedSlots > 0) {
log.info("{} slots found to add", addedSlots);
log.info("{} slots added", addedSlots);
}
}
@ -749,6 +754,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
addedSlots.stream().forEach(slot -> {
addEntry(slot, entry);
lastPartitions.put(slot, currentPartition);
currentPartition.incReference();
changedSlots.add(slot);
});
if (!addedSlots.isEmpty()) {
@ -761,8 +767,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
removedSlots.stream().forEach(slot -> {
if (lastPartitions.remove(slot, currentPartition)) {
lastUri2Partition.remove(currentPartition.getMasterAddress());
removeEntry(slot);
if (currentPartition.decReference() == 0) {
lastUri2Partition.remove(currentPartition.getMasterAddress());
}
removeEntry(slot, entry);
changedSlots.add(slot);
}
});

@ -43,6 +43,8 @@ public class ClusterPartition {
private Set<ClusterSlotRange> slotRanges = Collections.emptySet();
private ClusterPartition parent;
private int references;
public ClusterPartition(String nodeId) {
super();
@ -140,7 +142,14 @@ public class ClusterPartition {
slaveAddresses.remove(uri);
failedSlaves.remove(uri);
}
public void incReference() {
references++;
}
public int decReference() {
return --references;
}
@Override
public int hashCode() {
return Objects.hash(nodeId);

Loading…
Cancel
Save