|
|
@ -18,8 +18,10 @@ package org.redisson.pubsub;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
|
|
|
|
|
|
import org.redisson.PubSubEntry;
|
|
|
|
import org.redisson.PubSubEntry;
|
|
|
|
|
|
|
|
import org.redisson.client.BaseRedisPubSubListener;
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
import org.redisson.client.codec.LongCodec;
|
|
|
|
import org.redisson.client.codec.LongCodec;
|
|
|
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubType;
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
|
|
|
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
@ -72,5 +74,35 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
|
|
|
|
|
|
|
|
|
|
|
|
protected abstract E createEntry(Promise<E> newPromise);
|
|
|
|
protected abstract E createEntry(Promise<E> newPromise);
|
|
|
|
|
|
|
|
|
|
|
|
protected abstract RedisPubSubListener<Long> createListener(String channelName, E value);
|
|
|
|
protected abstract void onMessage(E value, Long message);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private RedisPubSubListener<Long> createListener(final String channelName, final E value) {
|
|
|
|
|
|
|
|
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public void onMessage(String channel, Long message) {
|
|
|
|
|
|
|
|
if (!channelName.equals(channel)) {
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PublishSubscribe.this.onMessage(value, message);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public boolean onStatus(PubSubType type, String channel) {
|
|
|
|
|
|
|
|
if (!channelName.equals(channel)) {
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (type == PubSubType.SUBSCRIBE) {
|
|
|
|
|
|
|
|
value.getPromise().trySuccess(value);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
return listener;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|