diff --git a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 9d0a75c59..14646f228 100644 --- a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -18,22 +18,22 @@ package org.redisson.spring.data.connection; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.ChannelName; import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; /** - * + * * @author Nikita Koksharov * */ @@ -41,7 +41,7 @@ public class RedissonSubscription extends AbstractSubscription { private final CommandAsyncExecutor commandExecutor; private final PublishSubscribeService subscribeService; - + public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) { super(listener, null, null); this.commandExecutor = commandExecutor; @@ -51,6 +51,7 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doSubscribe(byte[]... channels) { List> list = new ArrayList<>(); + Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : channels) { CompletableFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() { @Override @@ -63,24 +64,48 @@ public class RedissonSubscription extends AbstractSubscription { DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), m); getListener().onMessage(msg, null); } + + @Override + public boolean onStatus(PubSubType type, CharSequence ch) { + if (!Arrays.equals(((ChannelName) ch).getName(), channel)) { + return false; + } + + if (getListener() instanceof SubscriptionListener) { + subscribed.add(channel); + } + return super.onStatus(type, ch); + } + }); list.add(f); } for (CompletableFuture future : list) { commandExecutor.get(future); } + for (byte[] channel : subscribed) { + ((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1); + } } @Override protected void doUnsubscribe(boolean all, byte[]... channels) { for (byte[] channel : channels) { - subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE); + CompletableFuture f = subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE); + if (getListener() instanceof SubscriptionListener) { + f.whenComplete((r, e) -> { + if (r != null) { + ((SubscriptionListener) getListener()).onChannelUnsubscribed(channel, 1); + } + }); + } } } @Override protected void doPsubscribe(byte[]... patterns) { List> list = new ArrayList<>(); + Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : patterns) { CompletableFuture> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { @Override @@ -93,12 +118,27 @@ public class RedissonSubscription extends AbstractSubscription { DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); getListener().onMessage(msg, ((ChannelName)pattern).getName()); } + + @Override + public boolean onStatus(PubSubType type, CharSequence pattern) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return false; + } + + if (getListener() instanceof SubscriptionListener) { + subscribed.add(channel); + } + return super.onStatus(type, pattern); + } }); list.add(f); } for (CompletableFuture future : list) { commandExecutor.get(future); } + for (byte[] channel : subscribed) { + ((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1); + } } private byte[] toBytes(Object message) { @@ -111,7 +151,14 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPUnsubscribe(boolean all, byte[]... patterns) { for (byte[] pattern : patterns) { - subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE); + CompletableFuture f = subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE); + if (getListener() instanceof SubscriptionListener) { + f.whenComplete((r, e) -> { + if (r != null) { + ((SubscriptionListener) getListener()).onPatternUnsubscribed(pattern, 1); + } + }); + } } } diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 9d0a75c59..59ee1a4ff 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -18,19 +18,19 @@ package org.redisson.spring.data.connection; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.ChannelName; import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; /** * @@ -51,6 +51,7 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doSubscribe(byte[]... channels) { List> list = new ArrayList<>(); + Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : channels) { CompletableFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() { @Override @@ -63,24 +64,48 @@ public class RedissonSubscription extends AbstractSubscription { DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), m); getListener().onMessage(msg, null); } + + @Override + public boolean onStatus(PubSubType type, CharSequence ch) { + if (!Arrays.equals(((ChannelName) ch).getName(), channel)) { + return false; + } + + if (getListener() instanceof SubscriptionListener) { + subscribed.add(channel); + } + return super.onStatus(type, ch); + } + }); list.add(f); } for (CompletableFuture future : list) { commandExecutor.get(future); } + for (byte[] channel : subscribed) { + ((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1); + } } @Override protected void doUnsubscribe(boolean all, byte[]... channels) { for (byte[] channel : channels) { - subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE); + CompletableFuture f = subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE); + if (getListener() instanceof SubscriptionListener) { + f.whenComplete((r, e) -> { + if (r != null) { + ((SubscriptionListener) getListener()).onChannelUnsubscribed(channel, 1); + } + }); + } } } @Override protected void doPsubscribe(byte[]... patterns) { List> list = new ArrayList<>(); + Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : patterns) { CompletableFuture> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { @Override @@ -93,12 +118,27 @@ public class RedissonSubscription extends AbstractSubscription { DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); getListener().onMessage(msg, ((ChannelName)pattern).getName()); } + + @Override + public boolean onStatus(PubSubType type, CharSequence pattern) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return false; + } + + if (getListener() instanceof SubscriptionListener) { + subscribed.add(channel); + } + return super.onStatus(type, pattern); + } }); list.add(f); } for (CompletableFuture future : list) { commandExecutor.get(future); } + for (byte[] channel : subscribed) { + ((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1); + } } private byte[] toBytes(Object message) { @@ -111,7 +151,14 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPUnsubscribe(boolean all, byte[]... patterns) { for (byte[] pattern : patterns) { - subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE); + CompletableFuture f = subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE); + if (getListener() instanceof SubscriptionListener) { + f.whenComplete((r, e) -> { + if (r != null) { + ((SubscriptionListener) getListener()).onPatternUnsubscribed(pattern, 1); + } + }); + } } }