|
|
|
@ -18,22 +18,22 @@ package org.redisson.spring.data.connection;
|
|
|
|
|
import org.redisson.client.BaseRedisPubSubListener;
|
|
|
|
|
import org.redisson.client.ChannelName;
|
|
|
|
|
import org.redisson.client.codec.ByteArrayCodec;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubType;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.pubsub.PubSubConnectionEntry;
|
|
|
|
|
import org.redisson.pubsub.PublishSubscribeService;
|
|
|
|
|
import org.springframework.data.redis.connection.DefaultMessage;
|
|
|
|
|
import org.springframework.data.redis.connection.MessageListener;
|
|
|
|
|
import org.springframework.data.redis.connection.SubscriptionListener;
|
|
|
|
|
import org.springframework.data.redis.connection.util.AbstractSubscription;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
*
|
|
|
|
|
* @author Nikita Koksharov
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
@ -41,7 +41,7 @@ public class RedissonSubscription extends AbstractSubscription {
|
|
|
|
|
|
|
|
|
|
private final CommandAsyncExecutor commandExecutor;
|
|
|
|
|
private final PublishSubscribeService subscribeService;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
|
|
|
|
|
super(listener, null, null);
|
|
|
|
|
this.commandExecutor = commandExecutor;
|
|
|
|
@ -51,6 +51,7 @@ public class RedissonSubscription extends AbstractSubscription {
|
|
|
|
|
@Override
|
|
|
|
|
protected void doSubscribe(byte[]... channels) {
|
|
|
|
|
List<CompletableFuture<?>> list = new ArrayList<>();
|
|
|
|
|
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
|
|
|
|
|
for (byte[] channel : channels) {
|
|
|
|
|
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
|
|
|
|
|
@Override
|
|
|
|
@ -63,24 +64,48 @@ public class RedissonSubscription extends AbstractSubscription {
|
|
|
|
|
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), m);
|
|
|
|
|
getListener().onMessage(msg, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean onStatus(PubSubType type, CharSequence ch) {
|
|
|
|
|
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (getListener() instanceof SubscriptionListener) {
|
|
|
|
|
subscribed.add(channel);
|
|
|
|
|
}
|
|
|
|
|
return super.onStatus(type, ch);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
list.add(f);
|
|
|
|
|
}
|
|
|
|
|
for (CompletableFuture<?> future : list) {
|
|
|
|
|
commandExecutor.get(future);
|
|
|
|
|
}
|
|
|
|
|
for (byte[] channel : subscribed) {
|
|
|
|
|
((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void doUnsubscribe(boolean all, byte[]... channels) {
|
|
|
|
|
for (byte[] channel : channels) {
|
|
|
|
|
subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
|
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
|
|
|
|
|
if (getListener() instanceof SubscriptionListener) {
|
|
|
|
|
f.whenComplete((r, e) -> {
|
|
|
|
|
if (r != null) {
|
|
|
|
|
((SubscriptionListener) getListener()).onChannelUnsubscribed(channel, 1);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void doPsubscribe(byte[]... patterns) {
|
|
|
|
|
List<CompletableFuture<?>> list = new ArrayList<>();
|
|
|
|
|
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
|
|
|
|
|
for (byte[] channel : patterns) {
|
|
|
|
|
CompletableFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
|
|
|
|
|
@Override
|
|
|
|
@ -93,12 +118,27 @@ public class RedissonSubscription extends AbstractSubscription {
|
|
|
|
|
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
|
|
|
|
|
getListener().onMessage(msg, ((ChannelName)pattern).getName());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean onStatus(PubSubType type, CharSequence pattern) {
|
|
|
|
|
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (getListener() instanceof SubscriptionListener) {
|
|
|
|
|
subscribed.add(channel);
|
|
|
|
|
}
|
|
|
|
|
return super.onStatus(type, pattern);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
list.add(f);
|
|
|
|
|
}
|
|
|
|
|
for (CompletableFuture<?> future : list) {
|
|
|
|
|
commandExecutor.get(future);
|
|
|
|
|
}
|
|
|
|
|
for (byte[] channel : subscribed) {
|
|
|
|
|
((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private byte[] toBytes(Object message) {
|
|
|
|
@ -111,7 +151,14 @@ public class RedissonSubscription extends AbstractSubscription {
|
|
|
|
|
@Override
|
|
|
|
|
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
|
|
|
|
|
for (byte[] pattern : patterns) {
|
|
|
|
|
subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE);
|
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE);
|
|
|
|
|
if (getListener() instanceof SubscriptionListener) {
|
|
|
|
|
f.whenComplete((r, e) -> {
|
|
|
|
|
if (r != null) {
|
|
|
|
|
((SubscriptionListener) getListener()).onPatternUnsubscribed(pattern, 1);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|