Fixed - different topics subscribed to the same Redis node in Cluster.

pull/2954/head
Nikita Koksharov 5 years ago
parent 671a63ba0c
commit 9fa6ce3ac7

@ -16,6 +16,7 @@
package org.redisson.pubsub;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -47,32 +48,34 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
/**
*
*
* @author Nikita Koksharov
*
*/
public class PublishSubscribeService {
private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
private final ConnectionManager connectionManager;
private final MasterSlaveServersConfig config;
private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<MasterSlaveEntry, Queue<PubSubConnectionEntry>> freePubSubMap = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<>();
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
private final LockPubSub lockPubSub = new LockPubSub(this);
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super();
this.connectionManager = connectionManager;
@ -81,15 +84,15 @@ public class PublishSubscribeService {
locks[i] = new AsyncSemaphore(1);
}
}
public LockPubSub getLockPubSub() {
return lockPubSub;
}
public CountDownLatchPubSub getCountDownLatchPubSub() {
return countDownLatchPubSub;
}
public SemaphorePubSub getSemaphorePubSub() {
return semaphorePubSub;
}
@ -101,7 +104,7 @@ public class PublishSubscribeService {
public RFuture<PubSubConnectionEntry> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
}
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, new ChannelName(channelName), promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
@ -122,7 +125,7 @@ public class PublishSubscribeService {
lock.release();
return;
}
RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
promise.onComplete((res, e) -> {
if (e != null) {
@ -134,7 +137,7 @@ public class PublishSubscribeService {
promise.tryFailure(e);
return;
}
promise.trySuccess(res);
});
subscribe(codec, channelName, result, type, lock, listeners);
@ -142,7 +145,7 @@ public class PublishSubscribeService {
});
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
@ -152,8 +155,8 @@ public class PublishSubscribeService {
public AsyncSemaphore getSemaphore(ChannelName channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(Codec codec, ChannelName channelName,
private void subscribe(Codec codec, ChannelName channelName,
RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
@ -170,41 +173,43 @@ public class PublishSubscribeService {
freePubSubLock.release();
return;
}
Queue<PubSubConnectionEntry> freePubSubConnections = getConnectionsQueue(channelName);
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, promise, type, lock, listeners);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = freeEntry.psubscribe(codec, channelName);
} else {
future = freeEntry.subscribe(codec, channelName);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -214,7 +219,7 @@ public class PublishSubscribeService {
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -224,10 +229,16 @@ public class PublishSubscribeService {
}
});
}
});
}
private Queue<PubSubConnectionEntry> getConnectionsQueue(ChannelName channelName) {
int slot = connectionManager.calcSlot(channelName.getName());
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return freePubSubMap.getOrDefault(entry, emptyQueue);
}
private RFuture<Void> addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
RedisPubSubListener<?>... listeners) {
@ -236,7 +247,7 @@ public class PublishSubscribeService {
}
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
RFuture<Void> subscribeFuture = list.getSuccessFuture();
subscribeFuture.onComplete((res, e) -> {
if (!promise.trySuccess(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
@ -263,7 +274,7 @@ public class PublishSubscribeService {
entry.returnPubSubConnection(pubSubEntry);
}
}
private RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
MasterSlaveEntry entry = connectionManager.getEntry(slot);
if (entry == null) {
@ -272,7 +283,7 @@ public class PublishSubscribeService {
}
return entry.nextPubSubConnection();
}
private void connect(Codec codec, ChannelName channelName,
RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
int slot = connectionManager.calcSlot(channelName.getName());
@ -282,41 +293,41 @@ public class PublishSubscribeService {
((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e);
}
});
connFuture.onComplete((conn, e) -> {
if (e != null) {
connFuture.onComplete((conn, ex) -> {
if (ex != null) {
freePubSubLock.release();
lock.release();
promise.tryFailure(e);
promise.tryFailure(ex);
return;
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
int remainFreeAmount = entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount > 0) {
freePubSubConnections.add(entry);
addFreeConnectionEntry(channelName, entry);
}
freePubSubLock.release();
RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = entry.psubscribe(codec, channelName);
} else {
future = entry.subscribe(codec, channelName);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -326,7 +337,7 @@ public class PublishSubscribeService {
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -337,43 +348,43 @@ public class PublishSubscribeService {
});
});
}
public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null || connectionManager.isShuttingDown()) {
lock.release();
return RedissonPromise.newSucceededFuture(null);
}
AtomicBoolean executed = new AtomicBoolean();
RedissonPromise<Void> result = new RedissonPromise<Void>();
ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
executed.set(true);
if (entry.release() == 1) {
freePubSubConnections.add(entry);
addFreeConnectionEntry(channelName, entry);
}
lock.release();
result.trySuccess(null);
return true;
}
return false;
}
});
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -385,10 +396,10 @@ public class PublishSubscribeService {
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
return result;
}
public RFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
if (connectionManager.isShuttingDown()) {
return RedissonPromise.newSucceededFuture(null);
@ -409,16 +420,17 @@ public class PublishSubscribeService {
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
Queue<PubSubConnectionEntry> freePubSubConnections = getConnectionsQueue(channelName);
freePubSubConnections.remove(entry);
freePubSubLock.release();
Codec entryCodec;
if (topicType == PubSubType.PUNSUBSCRIBE) {
entryCodec = entry.getConnection().getPatternChannels().get(channelName);
} else {
entryCodec = entry.getConnection().getChannels().get(channelName);
}
AtomicBoolean executed = new AtomicBoolean();
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@ -426,7 +438,7 @@ public class PublishSubscribeService {
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == topicType && channel.equals(channelName)) {
executed.set(true);
lock.release();
result.trySuccess(entryCodec);
return true;
@ -442,14 +454,14 @@ public class PublishSubscribeService {
} else {
future = entry.unsubscribe(channelName, listener);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -465,50 +477,59 @@ public class PublishSubscribeService {
});
}
});
return result;
}
public void punsubscribe(ChannelName channelName, AsyncSemaphore lock) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null || connectionManager.isShuttingDown()) {
lock.release();
return;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
addFreeConnectionEntry(channelName, entry);
}
lock.release();
return true;
}
return false;
}
});
}
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
int slot = connectionManager.calcSlot(channelName.getName());
MasterSlaveEntry me = connectionManager.getEntry(slot);
Queue<PubSubConnectionEntry> freePubSubConnections = freePubSubMap.computeIfAbsent(me, e -> new ConcurrentLinkedQueue<>());
freePubSubConnections.add(entry);
}
public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (PubSubConnectionEntry entry : freePubSubConnections) {
if (entry.getConnection().equals(redisPubSubConnection)) {
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
freePubSubConnections.remove(entry);
freePubSubLock.release();
}
});
break;
for (Queue<PubSubConnectionEntry> queue : freePubSubMap.values()) {
for (PubSubConnectionEntry entry : queue) {
if (entry.getConnection().equals(redisPubSubConnection)) {
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
queue.remove(entry);
freePubSubLock.release();
}
});
break;
}
}
}
for (ChannelName channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
@ -527,12 +548,12 @@ public class PublishSubscribeService {
if (listeners.isEmpty()) {
return;
}
subscribeCodecFuture.onComplete((subscribeCodec, e) -> {
if (subscribeCodec == null) {
return;
}
if (topicType == PubSubType.PUNSUBSCRIBE) {
psubscribe(channelName, listeners, subscribeCodec);
} else {
@ -549,7 +570,7 @@ public class PublishSubscribeService {
subscribe(channelName, listeners, subscribeCodec);
return;
}
log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, res.getConnection().getRedisClient());
});
}
@ -562,19 +583,14 @@ public class PublishSubscribeService {
psubscribe(channelName, listeners, subscribeCodec);
return;
}
log.info("listeners of '{}' channel-pattern to '{}' have been resubscribed", channelName, res.getConnection().getRedisClient());
});
}
@Override
public String toString() {
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", freePubSubConnections="
+ freePubSubConnections + "]";
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", freePubSubMap=" + freePubSubMap + "]";
}
}

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.After;
import org.junit.AfterClass;
@ -1137,7 +1138,49 @@ public class RedissonTopicTest {
t.start();
return t;
}
@Test
public void testClusterSharding() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
int j = i;
RTopic topic = redisson.getTopic("test" + i);
topic.addListener(Integer.class, (c, v) -> {
assertThat(v).isEqualTo(j);
counter.incrementAndGet();
});
}
for (int i = 0; i < 10; i++) {
RTopic topic = redisson.getTopic("test" + i);
topic.publish(i);
}
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> counter.get() == 10);
redisson.shutdown();
process.shutdown();
}
@Test
public void testReattachInClusterSlave() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();

Loading…
Cancel
Save