refactoring

pull/5038/head
Nikita Koksharov 2 years ago
parent 08eb573a21
commit 522326083c

@ -518,12 +518,12 @@ public class Redisson implements RedissonClient {
@Override
public RReliableTopic getReliableTopic(String name) {
return new RedissonReliableTopic(commandExecutor, name);
return new RedissonReliableTopic(commandExecutor, name, null);
}
@Override
public RReliableTopic getReliableTopic(String name, Codec codec) {
return new RedissonReliableTopic(codec, commandExecutor, name);
return new RedissonReliableTopic(codec, commandExecutor, name, null);
}
@Override

@ -405,14 +405,14 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReliableTopicReactive getReliableTopic(String name) {
RedissonReliableTopic topic = new RedissonReliableTopic(commandExecutor, name);
RedissonReliableTopic topic = new RedissonReliableTopic(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
}
@Override
public RReliableTopicReactive getReliableTopic(String name, Codec codec) {
RedissonReliableTopic topic = new RedissonReliableTopic(codec, commandExecutor, name);
RedissonReliableTopic topic = new RedissonReliableTopic(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
}

@ -16,7 +16,10 @@
package org.redisson;
import io.netty.util.Timeout;
import org.redisson.api.*;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.client.codec.Codec;
@ -30,13 +33,12 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -67,19 +69,23 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
}
private final Map<String, Entry> listeners = new ConcurrentHashMap<>();
private final AtomicReference<String> subscriberId = new AtomicReference<>();
private final String subscriberId;
private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
private volatile Timeout timeoutTask;
private final RStream<String, Object> stream;
private final AtomicBoolean subscribed = new AtomicBoolean();
public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name, String subscriberId) {
super(codec, commandExecutor, name);
stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name);
if (subscriberId == null) {
subscriberId = getServiceManager().generateId();
}
this.subscriberId = subscriberId;
}
public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name);
public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name, String subscriberId) {
this(commandExecutor.getServiceManager().getCfg().getCodec(), commandExecutor, name, subscriberId);
}
private String getTimeout() {
@ -140,33 +146,29 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
String id = getServiceManager().generateId();
listeners.put(id, new Entry(type, listener));
if (subscriberId.get() != null) {
if (!subscribed.compareAndSet(false, true)) {
return new CompletableFutureWrapper<>(id);
}
if (subscriberId.compareAndSet(null, id)) {
renewExpiration();
RFuture<Void> addFuture = commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);" +
"redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ",
Arrays.asList(getRawName(), getTimeout()),
StreamMessageId.ALL, id, System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout());
CompletionStage<String> f = addFuture.thenApply(r -> {
poll(id);
return id;
});
renewExpiration();
return new CompletableFutureWrapper<>(f);
}
RFuture<Void> addFuture = commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);" +
"redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ",
Arrays.asList(getRawName(), getTimeout()),
StreamMessageId.ALL, subscriberId, System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout());
CompletionStage<String> f = addFuture.thenApply(r -> {
poll(subscriberId);
return id;
});
return new CompletableFutureWrapper<>(id);
return new CompletableFutureWrapper<>(f);
}
private void poll(String id) {
RFuture<Map<StreamMessageId, Map<String, Object>>> f = stream.pendingRangeAsync(id, StreamMessageId.MIN, StreamMessageId.MAX, 100);
CompletionStage<Map<StreamMessageId, Map<String, Object>>> ff = f.thenCompose(r -> {
if (listeners.isEmpty()) {
if (!subscribed.get()) {
return CompletableFuture.completedFuture(r);
}
@ -250,7 +252,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
return;
}
if (!re || listeners.isEmpty()) {
if (!re || !subscribed.get()) {
return;
}
@ -297,7 +299,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
}
private RFuture<Void> removeSubscriber() {
String id = subscriberId.getAndSet(null);
subscribed.set(false);
readFuture.cancel(false);
timeoutTask.cancel();
@ -305,7 +307,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
"redis.call('xgroup', 'destroy', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); ",
Arrays.asList(getRawName(), getTimeout()),
id);
subscriberId);
}
@Override
@ -323,8 +325,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
private void renewExpiration() {
timeoutTask = getServiceManager().newTimeout(t -> {
String id = subscriberId.get();
if (id == null) {
if (!subscribed.get()) {
return;
}
@ -335,7 +336,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); "
+ "return 1; ",
Arrays.asList(getTimeout()),
System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), id);
System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update reliable topic {} expiration time", getRawName(), e);

@ -386,12 +386,12 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RReliableTopicRx getReliableTopic(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(commandExecutor, name), RReliableTopicRx.class);
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(commandExecutor, name, null), RReliableTopicRx.class);
}
@Override
public RReliableTopicRx getReliableTopic(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(codec, commandExecutor, name), RReliableTopicRx.class);
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(codec, commandExecutor, name, null), RReliableTopicRx.class);
}
@Override

Loading…
Cancel
Save