refactoring

pull/3438/head
Nikita Koksharov 4 years ago
parent 22a0d28715
commit de64949aff

@ -26,10 +26,12 @@ import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -125,7 +127,8 @@ public class RedissonPatternTopic implements RPatternTopic {
entry.removeListener(channelName, listenerId);
if (!entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore);
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<>(result));
} else {
semaphore.release();
result.trySuccess(null);
@ -136,21 +139,7 @@ public class RedissonPatternTopic implements RPatternTopic {
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
return;
}
entry.removeListener(channelName, listenerId);
if (!entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
removeListenerAsync(listenerId).syncUninterruptibly();
}
@Override
@ -165,7 +154,7 @@ public class RedissonPatternTopic implements RPatternTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore);
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}
@ -184,7 +173,7 @@ public class RedissonPatternTopic implements RPatternTopic {
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore);
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}

@ -26,6 +26,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
@ -145,7 +146,7 @@ public class RedissonTopic implements RTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly();
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}
@ -189,7 +190,7 @@ public class RedissonTopic implements RTopic {
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<Void>(promise));
} else {
semaphore.release();
@ -219,7 +220,7 @@ public class RedissonTopic implements RTopic {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore)
.onComplete(new TransferListener<Void>(promise));
} else {
semaphore.release();
@ -245,7 +246,7 @@ public class RedissonTopic implements RTopic {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly();
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}

@ -53,7 +53,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
if (!removed) {
throw new IllegalStateException();
}
service.unsubscribe(new ChannelName(channelName), semaphore);
service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName), semaphore);
} else {
semaphore.release();
}

@ -237,7 +237,7 @@ public class PublishSubscribeService {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(channelName, lock);
unsubscribe(type, channelName, lock);
} else {
lock.release();
}
@ -332,7 +332,7 @@ public class PublishSubscribeService {
});
}
public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {
public RFuture<Void> unsubscribe(PubSubType topicType, ChannelName channelName, AsyncSemaphore lock) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null || connectionManager.isShuttingDown()) {
lock.release();
@ -340,12 +340,12 @@ public class PublishSubscribeService {
}
AtomicBoolean executed = new AtomicBoolean();
RedissonPromise<Void> result = new RedissonPromise<Void>();
ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
RedissonPromise<Void> result = new RedissonPromise<>();
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
if (type == topicType && channel.equals(channelName)) {
executed.set(true);
if (entry.release() == 1) {
@ -359,25 +359,26 @@ public class PublishSubscribeService {
return false;
}
});
};
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
ChannelFuture future;
if (topicType == PubSubType.UNSUBSCRIBE) {
future = entry.unsubscribe(channelName, listener);
} else {
future = entry.punsubscribe(channelName, listener);
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (executed.get()) {
return;
}
entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName));
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
return;
}
connectionManager.newTimeout(timeout -> {
if (executed.get()) {
return;
}
entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
}, config.getTimeout(), TimeUnit.MILLISECONDS);
});
return result;
@ -468,32 +469,6 @@ public class PublishSubscribeService {
return result;
}
public void punsubscribe(ChannelName channelName, AsyncSemaphore lock) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null || connectionManager.isShuttingDown()) {
lock.release();
return;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
addFreeConnectionEntry(channelName, entry);
}
lock.release();
return true;
}
return false;
}
});
}
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
int slot = connectionManager.calcSlot(channelName.getName());
MasterSlaveEntry me = connectionManager.getEntry(slot);

Loading…
Cancel
Save