Fixed - RTopic.onSubscribe should be invoked after failover process

pull/860/head
Nikita 8 years ago
parent 15ed7c8a87
commit 7453033a32

@ -64,9 +64,9 @@ public interface ConnectionManager {
boolean isShuttingDown();
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener);
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners);
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore);
RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
ConnectionInitializer getConnectListener();
@ -106,9 +106,9 @@ public interface ConnectionManager {
PubSubConnectionEntry getPubSubEntry(String channelName);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
Codec unsubscribe(String channelName, AsyncSemaphore lock);

@ -364,41 +364,41 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?> listener) {
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() {
@Override
public void run() {
RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, listener, lock);
RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
}
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore);
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) {
public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() {
@Override
public void run() {
RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, listener, lock);
RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
}
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
@ -406,18 +406,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
private void subscribe(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
connEntry.addListener(channelName, listener);
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(connEntry);
}
});
subscribe(channelName, promise, type, lock, connEntry, listeners);
return;
}
@ -431,7 +424,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
connect(codec, channelName, promise, type, lock, listeners);
return;
}
@ -445,14 +438,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
freeEntry.release();
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(oldEntry);
}
});
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
@ -461,14 +447,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
freePubSubLock.release();
freeEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(freeEntry);
}
});
subscribe(channelName, promise, type, lock, freeEntry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
@ -480,8 +459,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
});
}
private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
final RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(connEntry);
}
});
}
private void connect(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final int slot = calcSlot(channelName);
RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@ -505,29 +499,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(oldEntry);
}
});
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
freePubSubConnections.add(entry);
freePubSubLock.release();
entry.addListener(channelName, listener);
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(entry);
}
});
subscribe(channelName, promise, type, lock, entry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);

@ -211,21 +211,15 @@ public class MasterSlaveEntry {
private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
subscribe(channelName, listeners, subscribeCodec);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
if (future.isSuccess()) {
log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient());
}
log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient());
}
});
}

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;

@ -98,7 +98,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);

@ -6,15 +6,15 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
@ -22,6 +22,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RSet;
import org.redisson.api.RTopic;
@ -463,7 +464,7 @@ public class RedissonTopicTest {
}
@Test
public void testReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
public void testReattach() throws Exception {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
@ -475,14 +476,24 @@ public class RedissonTopicTest {
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic<Integer> topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(new MessageListener<Integer>() {
@Override
public void onMessage(String channel, Integer msg) {
if (msg == 1) {
executed.set(true);
}
executed.set(true);
}
});
@ -498,10 +509,77 @@ public class RedissonTopicTest {
redisson.getTopic("topic").publish(1);
await().atMost(5, TimeUnit.SECONDS).untilTrue(executed);
await().atMost(2, TimeUnit.SECONDS).untilTrue(executed);
await().atMost(2, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
redisson.shutdown();
runner.stop();
}
@Test
public void testReattachInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();
RTopic<Integer> topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(new MessageListener<Integer>() {
@Override
public void onMessage(String channel, Integer msg) {
executed.set(true);
}
});
process.getNodes().stream().filter(x -> Arrays.asList(slave1.getPort(), slave2.getPort(), slave3.getPort()).contains(x.getRedisServerPort()))
.forEach(x -> {
try {
x.stop();
Thread.sleep(18000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
Thread.sleep(15000);
redisson.getTopic("topic").publish(1);
await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Assert.assertTrue(executed.get());
redisson.shutdown();
process.shutdown();
}
}

Loading…
Cancel
Save