Feature - new subscribeOnElements(), subscribeOnLastElements() and subscribeOnFirstElements() methods wait for CompletionStage to complete before polling the next element. #5786

pull/5802/head
Nikita Koksharov 11 months ago
parent 0bc90c8108
commit 6966c0f70c

@ -15,21 +15,25 @@
*/
package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.connection.ServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Nikita Koksharov
*/
public class ElementsSubscribeService {
private static final Logger log = LoggerFactory.getLogger(ElementsSubscribeService.class);
private final Map<Integer, CompletableFuture<?>> subscribeListeners = new ConcurrentHashMap<>();
private final ServiceManager serviceManager;
@ -37,7 +41,17 @@ public class ElementsSubscribeService {
this.serviceManager = serviceManager;
}
public <V> int subscribeOnElements(Supplier<RFuture<V>> func, Consumer<V> consumer) {
public <V> int subscribeOnElements(Supplier<CompletionStage<V>> func, Function<V, CompletionStage<Void>> consumer) {
int id = System.identityHashCode(consumer);
CompletableFuture<?> currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null));
if (currentFuture != null) {
throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered");
}
resubscribe(func, consumer);
return id;
}
public <V> int subscribeOnElements(Supplier<CompletionStage<V>> func, Consumer<V> consumer) {
int id = System.identityHashCode(consumer);
CompletableFuture<?> currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null));
if (currentFuture != null) {
@ -54,9 +68,9 @@ public class ElementsSubscribeService {
}
}
private <V> void resubscribe(Supplier<RFuture<V>> func, Consumer<V> consumer) {
private <V> void resubscribe(Supplier<CompletionStage<V>> func, Consumer<V> consumer) {
int listenerId = System.identityHashCode(consumer);
CompletableFuture<V> f = (CompletableFuture<V>) subscribeListeners.computeIfPresent(listenerId, (k, v) -> {
CompletionStage<V> f = (CompletionStage<V>) subscribeListeners.computeIfPresent(listenerId, (k, v) -> {
return func.get().toCompletableFuture();
});
if (f == null) {
@ -76,5 +90,27 @@ public class ElementsSubscribeService {
});
}
private <V> void resubscribe(Supplier<CompletionStage<V>> func, Function<V, CompletionStage<Void>> consumer) {
int listenerId = System.identityHashCode(consumer);
CompletionStage<V> f = (CompletionStage<V>) subscribeListeners.computeIfPresent(listenerId, (k, v) -> {
return func.get().toCompletableFuture();
});
if (f == null) {
return;
}
f.thenCompose(consumer).whenComplete((r, ex) -> {
if (ex != null) {
log.error(ex.getMessage(), ex);
serviceManager.newTimeout(t -> {
resubscribe(func, consumer);
}, 1, TimeUnit.SECONDS);
return;
}
resubscribe(func, consumer);
});
}
}

@ -15,13 +15,6 @@
*/
package org.redisson;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.Entry;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RFuture;
@ -32,6 +25,15 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingDeque}.
*
@ -169,6 +171,11 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return blockingQueue.subscribeOnElements(consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return blockingQueue.subscribeOnElements(consumer);
}
@Override
public void unsubscribe(int id) {
blockingQueue.unsubscribe(id);
@ -278,12 +285,26 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public int subscribeOnFirstElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public int subscribeOnFirstElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override

@ -28,8 +28,10 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -253,7 +255,14 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override

@ -34,6 +34,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* <p>Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}.
@ -49,7 +50,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<>(commandExecutor, name, redisson);
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), commandExecutor.getServiceManager().getCfg().getCodec());
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), getServiceManager().getCfg().getCodec());
channelName = RedissonSemaphore.getChannelName(semaphore.getRawName());
}
@ -124,14 +125,14 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
}
private <V> RFuture<V> wrapTakeFuture(RFuture<V> takeFuture) {
CompletableFuture<V> f = takeFuture.toCompletableFuture().thenCompose(res -> {
CompletionStage<V> f = takeFuture.thenCompose(res -> {
if (res == null) {
return CompletableFuture.completedFuture(null);
}
return createSemaphore(null).releaseAsync().handle((r, ex) -> res);
});
f.whenComplete((r, e) -> {
if (f.isCancelled()) {
if (f.toCompletableFuture().isCancelled()) {
takeFuture.cancel(false);
}
});
@ -260,12 +261,19 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
}
@Override

@ -30,8 +30,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* <p>Distributed and concurrent implementation of priority blocking deque.
@ -136,12 +138,19 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
}
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
@ -267,12 +276,26 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override
public int subscribeOnFirstElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public int subscribeOnLastElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public int subscribeOnFirstElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override

@ -33,9 +33,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.PriorityBlockingQueue}.
@ -74,7 +76,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
protected <T> void takeAsync(CompletableFuture<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object... params) {
long start = System.currentTimeMillis();
commandExecutor.getServiceManager().newTimeout(t -> {
getServiceManager().newTimeout(t -> {
RFuture<V> future = wrapLockedAsync(command, params);
future.whenComplete((res, e) -> {
if (e != null && !(e instanceof RedisConnectionException)) {
@ -175,12 +177,19 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
getServiceManager().getElementsSubscribeService().unsubscribe(listenerId);
}
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {

@ -36,8 +36,10 @@ import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
/**
@ -2010,12 +2012,26 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public int subscribeOnFirstElements(Consumer<V> consumer) {
return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Consumer<V> consumer) {
return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public int subscribeOnFirstElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeLastAsync, consumer);
}
@Override

@ -36,8 +36,7 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
/**
@ -644,7 +643,14 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return getServiceManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override
public int subscribeOnElements(Function<V, CompletionStage<Void>> consumer) {
return getServiceManager().getElementsSubscribeService()
.subscribeOnElements(this::takeAsync, consumer);
}
@Override

@ -19,8 +19,10 @@ import org.redisson.api.queue.DequeMoveArgs;
import java.time.Duration;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* {@link BlockingDeque} backed by Redis
@ -65,7 +67,25 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
V move(Duration timeout, DequeMoveArgs args);
/**
* Subscribes on first elements appeared in this queue.
* Use {@link #subscribeOnFirstElements(Function)} instead.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
@Deprecated
int subscribeOnFirstElements(Consumer<V> consumer);
/**
* Use {@link #subscribeOnLastElements(Function)} instead.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
@Deprecated
int subscribeOnLastElements(Consumer<V> consumer);
/**
* Use {@link #subscribeOnLastElements(Function)} instead.
* Continuously invokes {@link #takeFirstAsync()} method to get a new element.
* <p>
* NOTE: don't call blocking methods in the elements listener
@ -73,7 +93,7 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnFirstElements(Consumer<V> consumer);
int subscribeOnFirstElements(Function<V, CompletionStage<Void>> consumer);
/**
* Subscribes on last elements appeared in this queue.
@ -84,6 +104,6 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnLastElements(Consumer<V> consumer);
int subscribeOnLastElements(Function<V, CompletionStage<Void>> consumer);
}

@ -19,8 +19,10 @@ import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Distributed implementation of {@link BlockingQueue}
@ -117,6 +119,15 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
*/
V takeLastAndOfferFirstTo(String queueName) throws InterruptedException;
/**
* Use {@link #subscribeOnElements(Function)} instead.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
@Deprecated
int subscribeOnElements(Consumer<V> consumer);
/**
* Subscribes on elements appeared in this queue.
* Continuously invokes {@link #takeAsync()} method to get a new element.
@ -126,7 +137,7 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnElements(Consumer<V> consumer);
int subscribeOnElements(Function<V, CompletionStage<Void>> consumer);
/**
* Un-subscribes defined listener.

@ -21,8 +21,10 @@ import org.redisson.client.protocol.ScoredEntry;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
/**
@ -199,6 +201,24 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
V takeLast();
/**
* Use {@link #subscribeOnFirstElements(Function)} instead.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
@Deprecated
int subscribeOnFirstElements(Consumer<V> consumer);
/**
* Use {@link #subscribeOnLastElements(Function)} instead.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
@Deprecated
int subscribeOnLastElements(Consumer<V> consumer);
/**
* Subscribes on first elements appeared in this set.
* Continuously invokes {@link #takeFirstAsync()} method to get a new element.
@ -208,7 +228,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnFirstElements(Consumer<V> consumer);
int subscribeOnFirstElements(Function<V, CompletionStage<Void>> consumer);
/**
* Subscribes on last elements appeared in this set.
@ -219,7 +239,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnLastElements(Consumer<V> consumer);
int subscribeOnLastElements(Function<V, CompletionStage<Void>> consumer);
/**
* Un-subscribes defined listener.

Loading…
Cancel
Save