From 6966c0f70c5731f3f5553a27805ecf7c15cb11d2 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 17 Apr 2024 08:04:44 +0300 Subject: [PATCH] Feature - new subscribeOnElements(), subscribeOnLastElements() and subscribeOnFirstElements() methods wait for CompletionStage to complete before polling the next element. #5786 --- .../redisson/ElementsSubscribeService.java | 48 ++++++++++++++++--- .../org/redisson/RedissonBlockingDeque.java | 39 +++++++++++---- .../org/redisson/RedissonBlockingQueue.java | 11 ++++- .../RedissonBoundedBlockingQueue.java | 18 +++++-- .../RedissonPriorityBlockingDeque.java | 31 ++++++++++-- .../RedissonPriorityBlockingQueue.java | 15 ++++-- .../org/redisson/RedissonScoredSortedSet.java | 20 +++++++- .../org/redisson/RedissonTransferQueue.java | 12 +++-- .../java/org/redisson/api/RBlockingDeque.java | 26 ++++++++-- .../java/org/redisson/api/RBlockingQueue.java | 13 ++++- .../org/redisson/api/RScoredSortedSet.java | 24 +++++++++- 11 files changed, 218 insertions(+), 39 deletions(-) diff --git a/redisson/src/main/java/org/redisson/ElementsSubscribeService.java b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java index c5d3c3c06..c680ce65e 100644 --- a/redisson/src/main/java/org/redisson/ElementsSubscribeService.java +++ b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java @@ -15,21 +15,25 @@ */ package org.redisson; -import org.redisson.api.RFuture; import org.redisson.connection.ServiceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; /** * @author Nikita Koksharov */ public class ElementsSubscribeService { - + + private static final Logger log = LoggerFactory.getLogger(ElementsSubscribeService.class); private final Map> subscribeListeners = new ConcurrentHashMap<>(); private final ServiceManager serviceManager; @@ -37,7 +41,17 @@ public class ElementsSubscribeService { this.serviceManager = serviceManager; } - public int subscribeOnElements(Supplier> func, Consumer consumer) { + public int subscribeOnElements(Supplier> func, Function> consumer) { + int id = System.identityHashCode(consumer); + CompletableFuture currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null)); + if (currentFuture != null) { + throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered"); + } + resubscribe(func, consumer); + return id; + } + + public int subscribeOnElements(Supplier> func, Consumer consumer) { int id = System.identityHashCode(consumer); CompletableFuture currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null)); if (currentFuture != null) { @@ -54,9 +68,9 @@ public class ElementsSubscribeService { } } - private void resubscribe(Supplier> func, Consumer consumer) { + private void resubscribe(Supplier> func, Consumer consumer) { int listenerId = System.identityHashCode(consumer); - CompletableFuture f = (CompletableFuture) subscribeListeners.computeIfPresent(listenerId, (k, v) -> { + CompletionStage f = (CompletionStage) subscribeListeners.computeIfPresent(listenerId, (k, v) -> { return func.get().toCompletableFuture(); }); if (f == null) { @@ -76,5 +90,27 @@ public class ElementsSubscribeService { }); } - + private void resubscribe(Supplier> func, Function> consumer) { + int listenerId = System.identityHashCode(consumer); + CompletionStage f = (CompletionStage) subscribeListeners.computeIfPresent(listenerId, (k, v) -> { + return func.get().toCompletableFuture(); + }); + if (f == null) { + return; + } + + f.thenCompose(consumer).whenComplete((r, ex) -> { + if (ex != null) { + log.error(ex.getMessage(), ex); + serviceManager.newTimeout(t -> { + resubscribe(func, consumer); + }, 1, TimeUnit.SECONDS); + return; + } + + resubscribe(func, consumer); + }); + } + + } diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index 53071487e..cd8ceb71f 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -15,13 +15,6 @@ */ package org.redisson; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import org.redisson.api.Entry; import org.redisson.api.RBlockingDeque; import org.redisson.api.RFuture; @@ -32,6 +25,15 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + /** *

Distributed and concurrent implementation of {@link java.util.concurrent.BlockingDeque}. * @@ -169,6 +171,11 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock return blockingQueue.subscribeOnElements(consumer); } + @Override + public int subscribeOnElements(Function> consumer) { + return blockingQueue.subscribeOnElements(consumer); + } + @Override public void unsubscribe(int id) { blockingQueue.unsubscribe(id); @@ -278,12 +285,26 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public int subscribeOnFirstElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); } @Override public int subscribeOnLastElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); + } + + @Override + public int subscribeOnFirstElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); + } + + @Override + public int subscribeOnLastElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index d0da3f766..dab6a64c5 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -28,8 +28,10 @@ import org.redisson.misc.CompletableFutureWrapper; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -253,7 +255,14 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public int subscribeOnElements(Consumer consumer) { - return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public int subscribeOnElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index a1ca725d8..ee32ea91e 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -34,6 +34,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; /** *

Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}. @@ -49,7 +50,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name, redisson); blockingQueue = new RedissonBlockingQueue<>(commandExecutor, name, redisson); - semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), commandExecutor.getServiceManager().getCfg().getCodec()); + semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), getServiceManager().getCfg().getCodec()); channelName = RedissonSemaphore.getChannelName(semaphore.getRawName()); } @@ -124,14 +125,14 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements } private RFuture wrapTakeFuture(RFuture takeFuture) { - CompletableFuture f = takeFuture.toCompletableFuture().thenCompose(res -> { + CompletionStage f = takeFuture.thenCompose(res -> { if (res == null) { return CompletableFuture.completedFuture(null); } return createSemaphore(null).releaseAsync().handle((r, ex) -> res); }); f.whenComplete((r, e) -> { - if (f.isCancelled()) { + if (f.toCompletableFuture().isCancelled()) { takeFuture.cancel(false); } }); @@ -260,12 +261,19 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public int subscribeOnElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public int subscribeOnElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); } @Override public void unsubscribe(int listenerId) { - commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); + getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index b6d115db8..d38bd7c14 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -30,8 +30,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; /** *

Distributed and concurrent implementation of priority blocking deque. @@ -136,12 +138,19 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public int subscribeOnElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public int subscribeOnElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); } @Override public void unsubscribe(int listenerId) { - commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); + getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); } public RFuture takeLastAndOfferFirstToAsync(String queueName) { @@ -267,12 +276,26 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public int subscribeOnFirstElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); } @Override public int subscribeOnLastElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); + } + + @Override + public int subscribeOnLastElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); + } + + @Override + public int subscribeOnFirstElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index ad83181e0..efa2e8850 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -33,9 +33,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; /** *

Distributed and concurrent implementation of {@link java.util.concurrent.PriorityBlockingQueue}. @@ -74,7 +76,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i protected void takeAsync(CompletableFuture result, long delay, long timeoutInMicro, RedisCommand command, Object... params) { long start = System.currentTimeMillis(); - commandExecutor.getServiceManager().newTimeout(t -> { + getServiceManager().newTimeout(t -> { RFuture future = wrapLockedAsync(command, params); future.whenComplete((res, e) -> { if (e != null && !(e instanceof RedisConnectionException)) { @@ -175,12 +177,19 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i @Override public int subscribeOnElements(Consumer consumer) { - return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public int subscribeOnElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); } @Override public void unsubscribe(int listenerId) { - commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); + getServiceManager().getElementsSubscribeService().unsubscribe(listenerId); } public RFuture takeLastAndOfferFirstToAsync(String queueName) { diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 6b51a0332..7583b978a 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -36,8 +36,10 @@ import java.time.Duration; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Stream; /** @@ -2010,12 +2012,26 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public int subscribeOnFirstElements(Consumer consumer) { - return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); } @Override public int subscribeOnLastElements(Consumer consumer) { - return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); + } + + @Override + public int subscribeOnFirstElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeFirstAsync, consumer); + } + + @Override + public int subscribeOnLastElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeLastAsync, consumer); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index 520a00a06..3525624b0 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -36,8 +36,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; - - +import java.util.function.Function; /** @@ -644,7 +643,14 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran @Override public int subscribeOnElements(Consumer consumer) { - return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public int subscribeOnElements(Function> consumer) { + return getServiceManager().getElementsSubscribeService() + .subscribeOnElements(this::takeAsync, consumer); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDeque.java b/redisson/src/main/java/org/redisson/api/RBlockingDeque.java index 82385cd07..882f68ab7 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDeque.java @@ -19,8 +19,10 @@ import org.redisson.api.queue.DequeMoveArgs; import java.time.Duration; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; /** * {@link BlockingDeque} backed by Redis @@ -65,7 +67,25 @@ public interface RBlockingDeque extends BlockingDeque, RBlockingQueue, V move(Duration timeout, DequeMoveArgs args); /** - * Subscribes on first elements appeared in this queue. + * Use {@link #subscribeOnFirstElements(Function)} instead. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + @Deprecated + int subscribeOnFirstElements(Consumer consumer); + + /** + * Use {@link #subscribeOnLastElements(Function)} instead. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + @Deprecated + int subscribeOnLastElements(Consumer consumer); + + /** + * Use {@link #subscribeOnLastElements(Function)} instead. * Continuously invokes {@link #takeFirstAsync()} method to get a new element. *

* NOTE: don't call blocking methods in the elements listener @@ -73,7 +93,7 @@ public interface RBlockingDeque extends BlockingDeque, RBlockingQueue, * @param consumer - queue elements listener * @return listenerId - id of listener */ - int subscribeOnFirstElements(Consumer consumer); + int subscribeOnFirstElements(Function> consumer); /** * Subscribes on last elements appeared in this queue. @@ -84,6 +104,6 @@ public interface RBlockingDeque extends BlockingDeque, RBlockingQueue, * @param consumer - queue elements listener * @return listenerId - id of listener */ - int subscribeOnLastElements(Consumer consumer); + int subscribeOnLastElements(Function> consumer); } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index c88de596e..31ca70b47 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -19,8 +19,10 @@ import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; /** * Distributed implementation of {@link BlockingQueue} @@ -117,6 +119,15 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin */ V takeLastAndOfferFirstTo(String queueName) throws InterruptedException; + /** + * Use {@link #subscribeOnElements(Function)} instead. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + @Deprecated + int subscribeOnElements(Consumer consumer); + /** * Subscribes on elements appeared in this queue. * Continuously invokes {@link #takeAsync()} method to get a new element. @@ -126,7 +137,7 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin * @param consumer - queue elements listener * @return listenerId - id of listener */ - int subscribeOnElements(Consumer consumer); + int subscribeOnElements(Function> consumer); /** * Un-subscribes defined listener. diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 4826212f7..eae467a9c 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -21,8 +21,10 @@ import org.redisson.client.protocol.ScoredEntry; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Stream; /** @@ -199,6 +201,24 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ V takeLast(); + /** + * Use {@link #subscribeOnFirstElements(Function)} instead. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + @Deprecated + int subscribeOnFirstElements(Consumer consumer); + + /** + * Use {@link #subscribeOnLastElements(Function)} instead. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + @Deprecated + int subscribeOnLastElements(Consumer consumer); + /** * Subscribes on first elements appeared in this set. * Continuously invokes {@link #takeFirstAsync()} method to get a new element. @@ -208,7 +228,7 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @param consumer - queue elements listener * @return listenerId - id of listener */ - int subscribeOnFirstElements(Consumer consumer); + int subscribeOnFirstElements(Function> consumer); /** * Subscribes on last elements appeared in this set. @@ -219,7 +239,7 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @param consumer - queue elements listener * @return listenerId - id of listener */ - int subscribeOnLastElements(Consumer consumer); + int subscribeOnLastElements(Function> consumer); /** * Un-subscribes defined listener.