diff --git a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java index ecf250aa9..adb67f746 100644 --- a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java @@ -16,10 +16,9 @@ package org.redisson; import io.netty.util.Timeout; -import org.redisson.api.RFuture; -import org.redisson.api.RReliableTopic; -import org.redisson.api.StreamMessageId; +import org.redisson.api.*; import org.redisson.api.listener.MessageListener; +import org.redisson.api.stream.StreamReadGroupArgs; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -29,8 +28,11 @@ import org.redisson.misc.CompletableFutureWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -68,13 +70,16 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl private final AtomicReference subscriberId = new AtomicReference<>(); private volatile RFuture>> readFuture; private volatile Timeout timeoutTask; + private final RStream stream; public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name); } public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); + stream = new RedissonStream<>(new CompositeCodec(StringCodec.INSTANCE, codec), commandExecutor, name); } private String getTimeout() { @@ -112,7 +117,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl } public RFuture sizeAsync() { - return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XLEN, getRawName()); + return stream.sizeAsync(); } @Override @@ -159,18 +164,31 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl } private void poll(String id) { - readFuture = commandExecutor.readAsync(getRawName(), new CompositeCodec(StringCodec.INSTANCE, codec), - RedisCommands.XREADGROUP_BLOCKING_SINGLE, "GROUP", id, "consumer", "BLOCK", 0, "STREAMS", getRawName(), ">"); - readFuture.whenComplete((res, ex) -> { - if (readFuture.isCancelled()) { - return; + RFuture>> f = stream.pendingRangeAsync(id, StreamMessageId.MIN, StreamMessageId.MAX, 100); + CompletionStage>> ff = f.thenCompose(r -> { + if (listeners.isEmpty()) { + return CompletableFuture.completedFuture(r); + } + + if (r.isEmpty()) { + readFuture = stream.readGroupAsync(id, "consumer", + StreamReadGroupArgs.neverDelivered().timeout(Duration.ofSeconds(0))); + return readFuture; } + return CompletableFuture.completedFuture(r); + }); + + ff.whenComplete((res, ex) -> { if (ex != null) { if (ex instanceof RedissonShutdownException) { return; } - log.error(ex.getMessage(), ex); + if (ex.getCause().getMessage().contains("NOGROUP")) { + return; + } + + log.error(ex.getCause().getMessage(), ex.getCause()); getServiceManager().newTimeout(task -> { poll(id); @@ -178,59 +196,66 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl return; } - if (listeners.isEmpty()) { - return; + CompletableFuture done = new CompletableFuture<>(); + if (!listeners.isEmpty()) { + getServiceManager().getExecutor().execute(() -> { + for (Map.Entry> entry : res.entrySet()) { + Object m = entry.getValue().get("m"); + listeners.values().forEach(e -> { + if (e.getType().isInstance(m)) { + ((MessageListener) e.getListener()).onMessage(getRawName(), m); + stream.ack(id, entry.getKey()); + } + }); + } + done.complete(null); + }); + } else { + done.complete(null); } - getServiceManager().getExecutor().execute(() -> { - res.values().forEach(entry -> { - Object m = entry.get("m"); - listeners.values().forEach(e -> { - if (e.getType().isInstance(m)) { - ((MessageListener) e.getListener()).onMessage(getRawName(), m); + done.thenAccept(r -> { + long time = System.currentTimeMillis(); + RFuture updateFuture = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local expired = redis.call('zrangebyscore', KEYS[2], 0, tonumber(ARGV[2]) - 1); " + + "for i, v in ipairs(expired) do " + + "redis.call('xgroup', 'destroy', KEYS[1], v); " + + "end; " + + "local r = redis.call('zscore', KEYS[2], ARGV[1]); " + + + "local score = 92233720368547758;" + + "local groups = redis.call('xinfo', 'groups', KEYS[1]); " + + "for i, v in ipairs(groups) do " + + "local id1, id2 = string.match(v[8], '(.*)%-(.*)'); " + + "score = math.min(tonumber(id1), score); " + + "end; " + + + "score = tostring(score) .. '-0';" + + "local range = redis.call('xrange', KEYS[1], score, '+'); " + + "if #range == 0 or (#range == 1 and range[1][1] == score) then " + + "redis.call('xtrim', KEYS[1], 'maxlen', 0); " + + "else " + + "redis.call('xtrim', KEYS[1], 'maxlen', #range); " + + "end;" + + "return r ~= false; ", + Arrays.asList(getRawName(), getTimeout()), + id, time); + + updateFuture.whenComplete((re, exc) -> { + if (exc != null) { + if (exc instanceof RedissonShutdownException) { + return; } - }); - }); - }); - - long time = System.currentTimeMillis(); - RFuture updateFuture = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local expired = redis.call('zrangebyscore', KEYS[2], 0, tonumber(ARGV[2]) - 1); " - + "for i, v in ipairs(expired) do " - + "redis.call('xgroup', 'destroy', KEYS[1], v); " - + "end; " - + "local r = redis.call('zscore', KEYS[2], ARGV[1]); " - - + "local score = 92233720368547758;" - + "local groups = redis.call('xinfo', 'groups', KEYS[1]); " + - "for i, v in ipairs(groups) do " - + "local id1, id2 = string.match(v[8], '(.*)%-(.*)'); " - + "score = math.min(tonumber(id1), score); " - + "end; " + - "score = tostring(score) .. '-0';" - + "local range = redis.call('xrange', KEYS[1], score, '+'); " - + "if #range == 0 or (#range == 1 and range[1][1] == score) then " - + "redis.call('xtrim', KEYS[1], 'maxlen', 0); " - + "else " - + "redis.call('xtrim', KEYS[1], 'maxlen', #range); " - + "end;" - + "return r ~= false; ", - Arrays.asList(getRawName(), getTimeout()), - id, time); - updateFuture.whenComplete((re, exc) -> { - if (exc != null) { - if (exc instanceof RedissonShutdownException) { + log.error("Unable to update subscriber status", exc); return; } - log.error("Unable to update subscriber status", exc); - return; - } - if (!re || listeners.isEmpty()) { - return; - } + if (!re || listeners.isEmpty()) { + return; + } - poll(id); + poll(id); + }); }); }); @@ -272,10 +297,10 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl } private RFuture removeSubscriber() { + String id = subscriberId.getAndSet(null); readFuture.cancel(false); timeoutTask.cancel(); - String id = subscriberId.getAndSet(null); return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('xgroup', 'destroy', KEYS[1], ARGV[1]); " + "redis.call('zrem', KEYS[2], ARGV[1]); ", @@ -298,6 +323,11 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl private void renewExpiration() { timeoutTask = getServiceManager().newTimeout(t -> { + String id = subscriberId.get(); + if (id == null) { + return; + } + RFuture future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('zscore', KEYS[1], ARGV[2]) == false then " + "return 0; " @@ -305,7 +335,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl + "redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); " + "return 1; ", Arrays.asList(getTimeout()), - System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId.get()); + System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), id); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update reliable topic {} expiration time", getRawName(), e); diff --git a/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java b/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java index 815b77c37..8cc303a53 100644 --- a/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java @@ -89,11 +89,12 @@ public class RedissonReliableTopicTest extends BaseTest { assertThat(rt.countSubscribers()).isEqualTo(1); assertThat(counter.get()).isEqualTo(10); + Thread.sleep(1000); assertThat(rt.size()).isEqualTo(0); } @Test - public void testAutoTrim() { + public void testAutoTrim() throws InterruptedException { RReliableTopic rt = redisson.getReliableTopic("test1"); AtomicInteger counter = new AtomicInteger(); rt.addListener(Integer.class, (ch, m) -> { @@ -109,6 +110,7 @@ public class RedissonReliableTopicTest extends BaseTest { } Awaitility.waitAtMost(Duration.ofSeconds(2)).until(() -> counter.get() == 20); + Thread.sleep(1000); assertThat(rt.size()).isEqualTo(0); } @@ -170,6 +172,7 @@ public class RedissonReliableTopicTest extends BaseTest { assertThat(rt.publish("m1")).isEqualTo(1); assertThat(a.await(1, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); assertThat(rt.size()).isEqualTo(0); RReliableTopic rt2 = redisson.getReliableTopic("test3");