Improvement - RedissonReliableTopic internal structure optimization #5027

pull/5038/head
Nikita Koksharov 2 years ago
parent 760972b1c4
commit 9d2ec1764f

@ -16,10 +16,9 @@
package org.redisson;
import io.netty.util.Timeout;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.StreamMessageId;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
@ -29,8 +28,11 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -68,13 +70,16 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
private final AtomicReference<String> subscriberId = new AtomicReference<>();
private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
private volatile Timeout timeoutTask;
private final RStream<String, Object> stream;
public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name);
}
public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name);
}
private String getTimeout() {
@ -112,7 +117,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
}
public RFuture<Long> sizeAsync() {
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XLEN, getRawName());
return stream.sizeAsync();
}
@Override
@ -159,18 +164,31 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
}
private void poll(String id) {
readFuture = commandExecutor.readAsync(getRawName(), new CompositeCodec(StringCodec.INSTANCE, codec),
RedisCommands.XREADGROUP_BLOCKING_SINGLE, "GROUP", id, "consumer", "BLOCK", 0, "STREAMS", getRawName(), ">");
readFuture.whenComplete((res, ex) -> {
if (readFuture.isCancelled()) {
return;
RFuture<Map<StreamMessageId, Map<String, Object>>> f = stream.pendingRangeAsync(id, StreamMessageId.MIN, StreamMessageId.MAX, 100);
CompletionStage<Map<StreamMessageId, Map<String, Object>>> ff = f.thenCompose(r -> {
if (listeners.isEmpty()) {
return CompletableFuture.completedFuture(r);
}
if (r.isEmpty()) {
readFuture = stream.readGroupAsync(id, "consumer",
StreamReadGroupArgs.neverDelivered().timeout(Duration.ofSeconds(0)));
return readFuture;
}
return CompletableFuture.completedFuture(r);
});
ff.whenComplete((res, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
return;
}
log.error(ex.getMessage(), ex);
if (ex.getCause().getMessage().contains("NOGROUP")) {
return;
}
log.error(ex.getCause().getMessage(), ex.getCause());
getServiceManager().newTimeout(task -> {
poll(id);
@ -178,59 +196,66 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
return;
}
if (listeners.isEmpty()) {
return;
CompletableFuture<Void> done = new CompletableFuture<>();
if (!listeners.isEmpty()) {
getServiceManager().getExecutor().execute(() -> {
for (Map.Entry<StreamMessageId, Map<String, Object>> entry : res.entrySet()) {
Object m = entry.getValue().get("m");
listeners.values().forEach(e -> {
if (e.getType().isInstance(m)) {
((MessageListener<Object>) e.getListener()).onMessage(getRawName(), m);
stream.ack(id, entry.getKey());
}
});
}
done.complete(null);
});
} else {
done.complete(null);
}
getServiceManager().getExecutor().execute(() -> {
res.values().forEach(entry -> {
Object m = entry.get("m");
listeners.values().forEach(e -> {
if (e.getType().isInstance(m)) {
((MessageListener<Object>) e.getListener()).onMessage(getRawName(), m);
done.thenAccept(r -> {
long time = System.currentTimeMillis();
RFuture<Boolean> updateFuture = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expired = redis.call('zrangebyscore', KEYS[2], 0, tonumber(ARGV[2]) - 1); "
+ "for i, v in ipairs(expired) do "
+ "redis.call('xgroup', 'destroy', KEYS[1], v); "
+ "end; "
+ "local r = redis.call('zscore', KEYS[2], ARGV[1]); "
+ "local score = 92233720368547758;"
+ "local groups = redis.call('xinfo', 'groups', KEYS[1]); " +
"for i, v in ipairs(groups) do "
+ "local id1, id2 = string.match(v[8], '(.*)%-(.*)'); "
+ "score = math.min(tonumber(id1), score); "
+ "end; " +
"score = tostring(score) .. '-0';"
+ "local range = redis.call('xrange', KEYS[1], score, '+'); "
+ "if #range == 0 or (#range == 1 and range[1][1] == score) then "
+ "redis.call('xtrim', KEYS[1], 'maxlen', 0); "
+ "else "
+ "redis.call('xtrim', KEYS[1], 'maxlen', #range); "
+ "end;"
+ "return r ~= false; ",
Arrays.asList(getRawName(), getTimeout()),
id, time);
updateFuture.whenComplete((re, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
}
});
});
});
long time = System.currentTimeMillis();
RFuture<Boolean> updateFuture = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expired = redis.call('zrangebyscore', KEYS[2], 0, tonumber(ARGV[2]) - 1); "
+ "for i, v in ipairs(expired) do "
+ "redis.call('xgroup', 'destroy', KEYS[1], v); "
+ "end; "
+ "local r = redis.call('zscore', KEYS[2], ARGV[1]); "
+ "local score = 92233720368547758;"
+ "local groups = redis.call('xinfo', 'groups', KEYS[1]); " +
"for i, v in ipairs(groups) do "
+ "local id1, id2 = string.match(v[8], '(.*)%-(.*)'); "
+ "score = math.min(tonumber(id1), score); "
+ "end; " +
"score = tostring(score) .. '-0';"
+ "local range = redis.call('xrange', KEYS[1], score, '+'); "
+ "if #range == 0 or (#range == 1 and range[1][1] == score) then "
+ "redis.call('xtrim', KEYS[1], 'maxlen', 0); "
+ "else "
+ "redis.call('xtrim', KEYS[1], 'maxlen', #range); "
+ "end;"
+ "return r ~= false; ",
Arrays.asList(getRawName(), getTimeout()),
id, time);
updateFuture.whenComplete((re, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
log.error("Unable to update subscriber status", exc);
return;
}
log.error("Unable to update subscriber status", exc);
return;
}
if (!re || listeners.isEmpty()) {
return;
}
if (!re || listeners.isEmpty()) {
return;
}
poll(id);
poll(id);
});
});
});
@ -272,10 +297,10 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
}
private RFuture<Void> removeSubscriber() {
String id = subscriberId.getAndSet(null);
readFuture.cancel(false);
timeoutTask.cancel();
String id = subscriberId.getAndSet(null);
return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('xgroup', 'destroy', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); ",
@ -298,6 +323,11 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
private void renewExpiration() {
timeoutTask = getServiceManager().newTimeout(t -> {
String id = subscriberId.get();
if (id == null) {
return;
}
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('zscore', KEYS[1], ARGV[2]) == false then "
+ "return 0; "
@ -305,7 +335,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); "
+ "return 1; ",
Arrays.asList(getTimeout()),
System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId.get());
System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), id);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update reliable topic {} expiration time", getRawName(), e);

@ -89,11 +89,12 @@ public class RedissonReliableTopicTest extends BaseTest {
assertThat(rt.countSubscribers()).isEqualTo(1);
assertThat(counter.get()).isEqualTo(10);
Thread.sleep(1000);
assertThat(rt.size()).isEqualTo(0);
}
@Test
public void testAutoTrim() {
public void testAutoTrim() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test1");
AtomicInteger counter = new AtomicInteger();
rt.addListener(Integer.class, (ch, m) -> {
@ -109,6 +110,7 @@ public class RedissonReliableTopicTest extends BaseTest {
}
Awaitility.waitAtMost(Duration.ofSeconds(2)).until(() -> counter.get() == 20);
Thread.sleep(1000);
assertThat(rt.size()).isEqualTo(0);
}
@ -170,6 +172,7 @@ public class RedissonReliableTopicTest extends BaseTest {
assertThat(rt.publish("m1")).isEqualTo(1);
assertThat(a.await(1, TimeUnit.SECONDS)).isTrue();
Thread.sleep(200);
assertThat(rt.size()).isEqualTo(0);
RReliableTopic rt2 = redisson.getReliableTopic("test3");

Loading…
Cancel
Save