From 72afbe83d9a9658fc8675d1553a4707f25bfdfeb Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 30 Sep 2019 13:40:33 +0300 Subject: [PATCH] Feature - subscribeOnElements method added to RBlockingQueue. subscribeOnFirstElements and subscribeOnLastElements methods added to RBlockingDeque #2346 --- .../redisson/ElementsSubscribeService.java | 92 +++++++++++++++++++ .../org/redisson/RedissonBlockingDeque.java | 23 ++++- .../org/redisson/RedissonBlockingQueue.java | 20 +++- .../RedissonBoundedBlockingQueue.java | 13 ++- .../RedissonPriorityBlockingDeque.java | 23 ++++- .../RedissonPriorityBlockingQueue.java | 13 ++- .../java/org/redisson/api/RBlockingDeque.java | 19 ++++ .../java/org/redisson/api/RBlockingQueue.java | 17 ++++ .../connection/ConnectionManager.java | 5 +- .../MasterSlaveConnectionManager.java | 11 ++- .../redisson/RedissonBlockingQueueTest.java | 27 ++++++ 11 files changed, 252 insertions(+), 11 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/ElementsSubscribeService.java diff --git a/redisson/src/main/java/org/redisson/ElementsSubscribeService.java b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java new file mode 100644 index 000000000..38b22a0b0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.api.RFuture; +import org.redisson.connection.ConnectionManager; +import org.redisson.misc.RedissonPromise; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * @author Nikita Koksharov + */ +public class ElementsSubscribeService { + + private final Map> subscribeListeners = new HashMap<>(); + private final ConnectionManager connectionManager; + + public ElementsSubscribeService(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + public int subscribeOnElements(Supplier> func, Consumer consumer) { + int id = System.identityHashCode(consumer); + synchronized (subscribeListeners) { + RFuture currentFuture = subscribeListeners.putIfAbsent(id, RedissonPromise.newSucceededFuture(null)); + if (currentFuture != null) { + throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered"); + } + } + resubscribe(func, consumer); + return id; + } + + public void unsubscribe(int listenerId) { + RFuture f; + synchronized (subscribeListeners) { + f = subscribeListeners.remove(listenerId); + } + if (f != null) { + f.cancel(false); + } + } + + private void resubscribe(Supplier> func, Consumer consumer) { + int listenerId = System.identityHashCode(consumer); + if (!subscribeListeners.containsKey(listenerId)) { + return; + } + + RFuture f; + synchronized (subscribeListeners) { + if (!subscribeListeners.containsKey(listenerId)) { + return; + } + + f = func.get(); + subscribeListeners.put(listenerId, f); + } + + f.onComplete((r, e) -> { + if (e != null) { + connectionManager.newTimeout(t -> { + resubscribe(func, consumer); + }, 1, TimeUnit.SECONDS); + return; + } + + consumer.accept(r); + resubscribe(func, consumer); + }); + } + + +} diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index 6534070f0..823662e2e 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.redisson.api.RBlockingDeque; import org.redisson.api.RFuture; @@ -126,7 +127,17 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } - + + @Override + public int subscribeOnElements(Consumer consumer) { + return blockingQueue.subscribeOnElements(consumer); + } + + @Override + public void unsubscribe(int id) { + blockingQueue.unsubscribe(id); + } + @Override public RFuture takeLastAndOfferFirstToAsync(String queueName) { return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); @@ -229,6 +240,16 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames)); } + @Override + public int subscribeOnFirstElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer); + } + + @Override + public int subscribeOnLastElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer); + } + @Override public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BRPOP_VALUE, toSeconds(timeout, unit), queueNames); diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index fb541c7f1..0de9c5c97 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -15,10 +15,6 @@ */ package org.redisson; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -28,6 +24,11 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + /** *

Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}. * @@ -175,4 +176,15 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock "return vals", Collections.singletonList(getName()), maxElements); } + + @Override + public int subscribeOnElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public void unsubscribe(int listenerId) { + commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId); + } + } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 72a0c7ba8..8e1abb21d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.redisson.api.RBoundedBlockingQueue; import org.redisson.api.RFuture; @@ -233,7 +234,17 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } - + + @Override + public int subscribeOnElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public void unsubscribe(int listenerId) { + commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId); + } + @Override public RFuture takeLastAndOfferFirstToAsync(String queueName) { return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index 00a505cbe..06ba4140b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.redisson.api.RFuture; import org.redisson.api.RPriorityBlockingDeque; @@ -98,7 +99,17 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } - + + @Override + public int subscribeOnElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public void unsubscribe(int listenerId) { + commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId); + } + public RFuture takeLastAndOfferFirstToAsync(String queueName) { return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); } @@ -215,6 +226,16 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames)); } + @Override + public int subscribeOnFirstElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeFirstAsync, consumer); + } + + @Override + public int subscribeOnLastElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeLastAsync, consumer); + } + @Override public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { throw new UnsupportedOperationException(); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 75093784b..dd099bf9f 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.redisson.api.RFuture; import org.redisson.api.RPriorityBlockingQueue; @@ -142,7 +143,17 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } - + + @Override + public int subscribeOnElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public void unsubscribe(int listenerId) { + commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId); + } + public RFuture takeLastAndOfferFirstToAsync(String queueName) { return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDeque.java b/redisson/src/main/java/org/redisson/api/RBlockingDeque.java index 3791321b3..d577a054f 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDeque.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.concurrent.BlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * {@link BlockingDeque} backed by Redis @@ -58,4 +59,22 @@ public interface RBlockingDeque extends BlockingDeque, RBlockingQueue, */ V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException; + /** + * Subscribes on first elements appeared in this queue. + * Continuously invokes {@link #takeFirstAsync()} method to get a new element. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + int subscribeOnFirstElements(Consumer consumer); + + /** + * Subscribes on last elements appeared in this queue. + * Continuously invokes {@link #takeLastAsync()} method to get a new element. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + int subscribeOnLastElements(Consumer consumer); + } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index 41d0f014c..85b51ba1a 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Distributed implementation of {@link BlockingQueue} @@ -69,4 +70,20 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin */ V takeLastAndOfferFirstTo(String queueName) throws InterruptedException; + /** + * Subscribes on elements appeared in this queue. + * Continuously invokes {@link #takeAsync()} method to get a new element. + * + * @param consumer - queue elements listener + * @return listenerId - id of listener + */ + int subscribeOnElements(Consumer consumer); + + /** + * Un-subscribes defined listener. + * + * @param listenerId - id of listener + */ + void unsubscribe(int listenerId); + } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index fc5b1e94a..def57b8f3 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.redisson.ElementsSubscribeService; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; @@ -50,7 +51,9 @@ public interface ConnectionManager { String getId(); CommandSyncService getCommandExecutor(); - + + ElementsSubscribeService getElementsSubscribeService(); + PublishSubscribeService getSubscribeService(); ExecutorService getExecutor(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index f0b804029..1f73751be 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; +import org.redisson.ElementsSubscribeService; import org.redisson.Version; import org.redisson.api.NodeType; import org.redisson.api.RFuture; @@ -151,7 +152,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final Config cfg; protected final AddressResolverGroup resolverGroup; - + + private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this); + private PublishSubscribeService subscribeService; private final Map nodeConnections = new ConcurrentHashMap<>(); @@ -727,7 +730,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public PublishSubscribeService getSubscribeService() { return subscribeService; } - + + public ElementsSubscribeService getElementsSubscribeService() { + return elementsSubscribeService; + } + public ExecutorService getExecutor() { return executor; } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index d6dd87032..572a97bdd 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -634,4 +634,31 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Assert.fail(e.getLocalizedMessage()); } } + + @Test + public void testSubscribeOnElements() throws InterruptedException { + RBlockingQueue q = redisson.getBlockingQueue("test"); + Set values = new HashSet<>(); + int listnerId = q.subscribeOnElements(v -> { + values.add(v); + }); + + for (int i = 0; i < 10; i++) { + q.add(i); + } + + Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> { + return values.size() == 10; + }); + + q.unsubscribe(listnerId); + + q.add(11); + q.add(12); + + Thread.sleep(1000); + + assertThat(values).hasSize(10); + } + }