diff --git a/src/main/java/org/redisson/RedissonBlockingQueueReactive.java b/src/main/java/org/redisson/RedissonBlockingQueueReactive.java new file mode 100644 index 000000000..3c3d2c09c --- /dev/null +++ b/src/main/java/org/redisson/RedissonBlockingQueueReactive.java @@ -0,0 +1,111 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.connection.decoder.ListDrainToDecoder; +import org.redisson.core.RBlockingQueueReactive; + +/** + * Offers blocking queue facilities through an intermediary + * {@link LinkedBlockingQueue} where items are added as soon as + * blpop returns. All {@link BlockingQueue} methods are actually + * delegated to this intermediary queue. + * + * @author Nikita Koksharov + */ +public class RedissonBlockingQueueReactive extends RedissonQueueReactive implements RBlockingQueueReactive { + + protected RedissonBlockingQueueReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + protected RedissonBlockingQueueReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + + @Override + public Publisher put(V e) { + return offer(e); + } + + @Override + public Publisher take() { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0); + } + + @Override + public Publisher poll(long timeout, TimeUnit unit) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout)); + } + + /* + * (non-Javadoc) + * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) + */ + @Override + public Publisher pollFromAny(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(unit.toSeconds(timeout)); + return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray()); + } + + @Override + public Publisher pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); + } + + @Override + public Publisher drainTo(Collection c) { + if (c == null) { + throw new NullPointerException(); + } + + return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), + "local vals = redis.call('lrange', KEYS[1], 0, -1); " + + "redis.call('ltrim', KEYS[1], -1, 0); " + + "return vals", Collections.singletonList(getName())); + } + + @Override + public Publisher drainTo(Collection c, int maxElements) { + if (c == null) { + throw new NullPointerException(); + } + return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), + "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + + "return vals", + Collections.singletonList(getName()), maxElements); + } +} \ No newline at end of file diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 6d4d959bf..a9668f73e 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -29,11 +29,11 @@ import org.redisson.connection.ElasticacheConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; +import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; -import org.redisson.core.RMap; import org.redisson.core.RMapReactive; import org.redisson.core.RPatternTopicReactive; import org.redisson.core.RQueueReactive; @@ -193,6 +193,16 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonQueueReactive(codec, commandExecutor, name); } + @Override + public RBlockingQueueReactive getBlockingQueue(String name) { + return new RedissonBlockingQueueReactive(commandExecutor, name); + } + + @Override + public RBlockingQueueReactive getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueueReactive(codec, commandExecutor, name); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index e17d01caa..7bc1cf152 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.List; import org.redisson.client.codec.Codec; +import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RLexSortedSet; @@ -143,16 +144,16 @@ public interface RedissonReactiveClient { RQueueReactive getQueue(String name, Codec codec); -// /** -// * Returns blocking queue instance by name. -// * -// * @param name of queue -// * @return -// */ -// RBlockingQueue getBlockingQueue(String name); -// -// RBlockingQueue getBlockingQueue(String name, Codec codec); -// + /** + * Returns blocking queue instance by name. + * + * @param name of queue + * @return + */ + RBlockingQueueReactive getBlockingQueue(String name); + + RBlockingQueueReactive getBlockingQueue(String name, Codec codec); + // /** // * Returns deque instance by name. // * diff --git a/src/main/java/org/redisson/core/RBlockingQueueReactive.java b/src/main/java/org/redisson/core/RBlockingQueueReactive.java new file mode 100644 index 000000000..4d00e25ae --- /dev/null +++ b/src/main/java/org/redisson/core/RBlockingQueueReactive.java @@ -0,0 +1,59 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * {@link BlockingQueue} backed by Redis + * + * @author Nikita Koksharov + * @param the type of elements held in this collection + */ +public interface RBlockingQueueReactive extends RQueueReactive { + + /** + * Retrieves and removes first available head of any queue in mode, waiting up to the + * specified wait time if necessary for an element to become available + * in any of defined queues including queue own. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return Publisher object with the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Publisher pollFromAny(long timeout, TimeUnit unit, String ... queueNames); + + Publisher drainTo(Collection c, int maxElements); + + Publisher drainTo(Collection c); + + Publisher pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit); + + Publisher poll(long timeout, TimeUnit unit); + + Publisher take(); + + Publisher put(V e); + +} diff --git a/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java b/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java new file mode 100644 index 000000000..5563714a0 --- /dev/null +++ b/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java @@ -0,0 +1,211 @@ +package org.redisson; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RBlockingQueueReactive; + +public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest { + + @Test + public void testPollFromAny() throws InterruptedException { + final RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + RBlockingQueueReactive queue2 = redisson.getBlockingQueue("queue:pollany1"); + RBlockingQueueReactive queue3 = redisson.getBlockingQueue("queue:pollany2"); + sync(queue3.put(2)); + sync(queue1.put(1)); + sync(queue2.put(3)); + } + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = sync(queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2")); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + + @Test + public void testTake() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("queue:take"); + sync(queue.put(3)); + } + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = sync(queue1.take()); + + Assert.assertEquals(3, l); + Assert.assertTrue(System.currentTimeMillis() - s > 9000); + } + + @Test + public void testPoll() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue1"); + sync(queue1.put(1)); + Assert.assertEquals((Integer)1, sync(queue1.poll(2, TimeUnit.SECONDS))); + + long s = System.currentTimeMillis(); + Assert.assertNull(sync(queue1.poll(5, TimeUnit.SECONDS))); + Assert.assertTrue(System.currentTimeMillis() - s > 5000); + } + @Test + public void testAwait() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue1"); + sync(queue1.put(1)); + + Assert.assertEquals((Integer)1, sync(queue1.poll(10, TimeUnit.SECONDS))); + } + + @Test + public void testPollLastAndOfferFirstTo() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2)); + sync(queue1.put(3)); + + RBlockingQueueReactive queue2 = redisson.getBlockingQueue("queue2"); + sync(queue2.put(4)); + sync(queue2.put(5)); + sync(queue2.put(6)); + + sync(queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS)); + MatcherAssert.assertThat(sync(queue2), Matchers.contains(3, 4, 5, 6)); + } + + @Test + public void testAddOffer() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("blocking:queue"); + sync(queue.add(1)); + sync(queue.offer(2)); + sync(queue.add(3)); + sync(queue.offer(4)); + + //MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4)); + Assert.assertEquals((Integer) 1, sync(queue.poll())); + MatcherAssert.assertThat(sync(queue), Matchers.contains(2, 3, 4)); + Assert.assertEquals((Integer) 2, sync(queue.peek())); + } + + @Test + public void testRemove() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("blocking:queue"); + sync(queue.add(1)); + sync(queue.add(2)); + sync(queue.add(3)); + sync(queue.add(4)); + + sync(queue.poll()); + sync(queue.poll()); + + MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 4)); + sync(queue.poll()); + sync(queue.poll()); + + Assert.assertEquals(0, sync(queue.size()).intValue()); + } + + @Test + public void testRemoveEmpty() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("blocking:queue"); + Assert.assertNull(sync(queue.poll())); + } + + @Test + public void testDrainTo() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("queue"); + for (int i = 0 ; i < 100; i++) { + sync(queue.offer(i)); + } + Assert.assertEquals(100, sync(queue.size()).intValue()); + Set batch = new HashSet(); + int count = sync(queue.drainTo(batch, 10)); + Assert.assertEquals(10, count); + Assert.assertEquals(10, batch.size()); + Assert.assertEquals(90, sync(queue.size()).intValue()); + sync(queue.drainTo(batch, 10)); + sync(queue.drainTo(batch, 20)); + sync(queue.drainTo(batch, 60)); + Assert.assertEquals(0, sync(queue.size()).intValue()); + } + + @Test + public void testBlockingQueue() { + + RBlockingQueueReactive queue = redisson.getBlockingQueue("test_:blocking:queue:"); + + ExecutorService executor = Executors.newFixedThreadPool(10); + + final AtomicInteger counter = new AtomicInteger(); + int total = 100; + for (int i = 0; i < total; i++) { + // runnable won't be executed in any particular order, and hence, int value as well. + executor.submit(new Runnable() { + @Override + public void run() { + redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet()); + } + }); + } + int count = 0; + while (count < total) { + int item = sync(queue.take()); + assertTrue(item > 0 && item <= total); + count++; + } + + assertThat(counter.get(), equalTo(total)); + } + + @Test + public void testDrainToCollection() throws Exception { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2L)); + sync(queue1.put("e")); + + ArrayList dst = new ArrayList(); + sync(queue1.drainTo(dst)); + MatcherAssert.assertThat(dst, Matchers.contains(1, 2L, "e")); + Assert.assertEquals(0, sync(queue1.size()).intValue()); + } + + @Test + public void testDrainToCollectionLimited() throws Exception { + RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2L)); + sync(queue1.put("e")); + + ArrayList dst = new ArrayList(); + sync(queue1.drainTo(dst, 2)); + MatcherAssert.assertThat(dst, Matchers.contains(1, 2L)); + Assert.assertEquals(1, sync(queue1.size()).intValue()); + + dst.clear(); + sync(queue1.drainTo(dst, 2)); + MatcherAssert.assertThat(dst, Matchers.contains("e")); + + + } +}