Feature - subscribeOnElements method added to RBlockingQueue. subscribeOnFirstElements and subscribeOnLastElements methods added to RBlockingDeque #2346

pull/2348/head
Nikita Koksharov 6 years ago
parent 21158954b9
commit 72afbe83d9

@ -0,0 +1,92 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.RedissonPromise;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @author Nikita Koksharov
*/
public class ElementsSubscribeService {
private final Map<Integer, RFuture<?>> subscribeListeners = new HashMap<>();
private final ConnectionManager connectionManager;
public ElementsSubscribeService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
public <V> int subscribeOnElements(Supplier<RFuture<V>> func, Consumer<V> consumer) {
int id = System.identityHashCode(consumer);
synchronized (subscribeListeners) {
RFuture<?> currentFuture = subscribeListeners.putIfAbsent(id, RedissonPromise.newSucceededFuture(null));
if (currentFuture != null) {
throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered");
}
}
resubscribe(func, consumer);
return id;
}
public void unsubscribe(int listenerId) {
RFuture<?> f;
synchronized (subscribeListeners) {
f = subscribeListeners.remove(listenerId);
}
if (f != null) {
f.cancel(false);
}
}
private <V> void resubscribe(Supplier<RFuture<V>> func, Consumer<V> consumer) {
int listenerId = System.identityHashCode(consumer);
if (!subscribeListeners.containsKey(listenerId)) {
return;
}
RFuture<V> f;
synchronized (subscribeListeners) {
if (!subscribeListeners.containsKey(listenerId)) {
return;
}
f = func.get();
subscribeListeners.put(listenerId, f);
}
f.onComplete((r, e) -> {
if (e != null) {
connectionManager.newTimeout(t -> {
resubscribe(func, consumer);
}, 1, TimeUnit.SECONDS);
return;
}
consumer.accept(r);
resubscribe(func, consumer);
});
}
}

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RFuture;
@ -126,7 +127,17 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return blockingQueue.subscribeOnElements(consumer);
}
@Override
public void unsubscribe(int id) {
blockingQueue.unsubscribe(id);
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
@ -229,6 +240,16 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
public int subscribeOnFirstElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BRPOP_VALUE, toSeconds(timeout, unit), queueNames);

@ -15,10 +15,6 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -28,6 +24,11 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}.
*
@ -175,4 +176,15 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
"return vals",
Collections.<Object>singletonList(getName()), maxElements);
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId);
}
}

@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RFuture;
@ -233,7 +234,17 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId);
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingDeque;
@ -98,7 +99,17 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId);
}
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@ -215,6 +226,16 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
public int subscribeOnFirstElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer);
}
@Override
public int subscribeOnLastElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer);
}
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException();

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingQueue;
@ -142,7 +143,17 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId);
}
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* {@link BlockingDeque} backed by Redis
@ -58,4 +59,22 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
*/
V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
/**
* Subscribes on first elements appeared in this queue.
* Continuously invokes {@link #takeFirstAsync()} method to get a new element.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnFirstElements(Consumer<V> consumer);
/**
* Subscribes on last elements appeared in this queue.
* Continuously invokes {@link #takeLastAsync()} method to get a new element.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnLastElements(Consumer<V> consumer);
}

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Distributed implementation of {@link BlockingQueue}
@ -69,4 +70,20 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
*/
V takeLastAndOfferFirstTo(String queueName) throws InterruptedException;
/**
* Subscribes on elements appeared in this queue.
* Continuously invokes {@link #takeAsync()} method to get a new element.
*
* @param consumer - queue elements listener
* @return listenerId - id of listener
*/
int subscribeOnElements(Consumer<V> consumer);
/**
* Un-subscribes defined listener.
*
* @param listenerId - id of listener
*/
void unsubscribe(int listenerId);
}

@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.redisson.ElementsSubscribeService;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
@ -50,7 +51,9 @@ public interface ConnectionManager {
String getId();
CommandSyncService getCommandExecutor();
ElementsSubscribeService getElementsSubscribeService();
PublishSubscribeService getSubscribeService();
ExecutorService getExecutor();

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -151,7 +152,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Config cfg;
protected final AddressResolverGroup<InetSocketAddress> resolverGroup;
private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this);
private PublishSubscribeService subscribeService;
private final Map<Object, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
@ -727,7 +730,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public PublishSubscribeService getSubscribeService() {
return subscribeService;
}
public ElementsSubscribeService getElementsSubscribeService() {
return elementsSubscribeService;
}
public ExecutorService getExecutor() {
return executor;
}

@ -634,4 +634,31 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Assert.fail(e.getLocalizedMessage());
}
}
@Test
public void testSubscribeOnElements() throws InterruptedException {
RBlockingQueue<Integer> q = redisson.getBlockingQueue("test");
Set<Integer> values = new HashSet<>();
int listnerId = q.subscribeOnElements(v -> {
values.add(v);
});
for (int i = 0; i < 10; i++) {
q.add(i);
}
Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> {
return values.size() == 10;
});
q.unsubscribe(listnerId);
q.add(11);
q.add(12);
Thread.sleep(1000);
assertThat(values).hasSize(10);
}
}

Loading…
Cancel
Save