RBlockingDeque implemented. #320

pull/337/head
Nikita 9 years ago
parent 8de1f5ebe7
commit 74d7f0d8bc

@ -127,6 +127,12 @@
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>

@ -39,6 +39,7 @@ import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RBatch;
import org.redisson.core.RBitSet;
import org.redisson.core.RBlockingDeque;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch;
@ -342,6 +343,16 @@ public class Redisson implements RedissonClient {
return new RedissonDeque<V>(codec, commandExecutor, name);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<V>(commandExecutor, name);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<V>(codec, commandExecutor, name);
};
@Override
public RAtomicLong getAtomicLong(String name) {
return new RedissonAtomicLong(commandExecutor, name);

@ -0,0 +1,284 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.core.RBlockingDeque;
import io.netty.util.concurrent.Future;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingDeque}.
*
* <p>Queue size limited by Redis server memory amount. This is why {@link #remainingCapacity()} always
* returns <code>Integer.MAX_VALUE</code>
*
* @author Nikita Koksharov
*/
public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlockingDeque<V> {
protected RedissonBlockingDeque(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
protected RedissonBlockingDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public Future<Boolean> putAsync(V e) {
return offerAsync(e);
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#put(java.lang.Object)
*/
@Override
public void put(V e) throws InterruptedException {
offer(e);
}
@Override
public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
@Override
public Future<V> takeAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#take()
*/
@Override
public V take() throws InterruptedException {
Future<V> res = takeAsync();
return res.await().getNow();
}
@Override
public Future<V> pollAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#poll(long, java.util.concurrent.TimeUnit)
*/
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollAsync(timeout, unit);
return res.await().getNow();
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/
@Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
Future<V> res = pollFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/
@Override
public Future<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
}
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return res.await().getNow();
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public int drainTo(Collection<? super V> c) {
return get(drainToAsync(c));
}
@Override
public Future<Integer> drainToAsync(Collection<? super V> c) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
}
@Override
public int drainTo(Collection<? super V> c, int maxElements) {
if (maxElements <= 0) {
return 0;
}
return get(drainToAsync(c, maxElements));
}
@Override
public Future<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"return vals",
Collections.<Object>singletonList(getName()), maxElements);
}
@Override
public Future<Void> putFirstAsync(V e) {
return addFirstAsync(e);
}
@Override
public Future<Void> putLastAsync(V e) {
return addLastAsync(e);
}
@Override
public void putFirst(V e) throws InterruptedException {
addFirst(e);
}
@Override
public void putLast(V e) throws InterruptedException {
addLast(e);
}
@Override
public boolean offerFirst(V e, long timeout, TimeUnit unit) throws InterruptedException {
addFirst(e);
return true;
}
@Override
public boolean offerLast(V e, long timeout, TimeUnit unit) throws InterruptedException {
addLast(e);
return true;
}
@Override
public V takeFirst() throws InterruptedException {
Future<V> res = takeFirstAsync();
return res.await().getNow();
}
@Override
public Future<V> takeFirstAsync() {
return takeAsync();
}
@Override
public Future<V> takeLastAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, getName(), 0);
}
@Override
public V takeLast() throws InterruptedException {
Future<V> res = takeLastAsync();
return res.await().getNow();
}
@Override
public Future<V> pollFirstAsync(long timeout, TimeUnit unit) {
return pollAsync(timeout, unit);
}
@Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
Future<V> res = pollFirstFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
}
@Override
public Future<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
return pollFromAnyAsync(timeout, unit, queueNames);
}
@Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
Future<V> res = pollLastFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
}
@Override
public Future<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, params.toArray());
}
@Override
public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollFirstAsync(timeout, unit);
return res.await().getNow();
}
@Override
public Future<V> pollLastAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, getName(), unit.toSeconds(timeout));
}
@Override
public V pollLast(long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollLastAsync(timeout, unit);
return res.await().getNow();
}
}

@ -123,12 +123,6 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
}
@Override
public V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit)
throws InterruptedException {
return pollLastAndOfferFirstTo(queue.getName(), timeout, unit);
}
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));

@ -26,6 +26,7 @@ import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RBatch;
import org.redisson.core.RBitSet;
import org.redisson.core.RBlockingDeque;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket;
import org.redisson.core.RMapCache;
@ -406,6 +407,24 @@ public interface RedissonClient {
*/
<V> RDeque<V> getDeque(String name, Codec codec);
/**
* Returns blocking deque instance by name.
*
* @param name of deque
* @return
*/
<V> RBlockingDeque<V> getBlockingDeque(String name);
/**
* Returns blocking deque instance by name
* using provided codec for deque objects.
*
* @param name of deque
* @param deque objects codec
* @return
*/
<V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec);
/**
* Returns atomicLong instance by name.
*

@ -30,8 +30,8 @@ import org.reactivestreams.Publisher;
public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
/**
* Retrieves and removes first available head of <b>any</b> queue in mode, waiting up to the
* specified wait time if necessary for an element to become available
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of

@ -128,6 +128,7 @@ public interface RedisCommands {
RedisCommand<Object> RPOPLPUSH = new RedisCommand<Object>("RPOPLPUSH");
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");

@ -20,10 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -58,7 +55,6 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -424,7 +420,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
timeoutRef.get().cancel();
int timeoutTime = connectionManager.getConfig().getTimeout();
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName())) {
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName())
|| command.getName().equals(RedisCommands.BRPOP_VALUE.getName())) {
Integer blPopTimeout = Integer.valueOf(params[params.length - 1].toString());
if (blPopTimeout == 0) {
return;

@ -0,0 +1,59 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* {@link BlockingDeque} backed by Redis
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>, RDeque<V>, RBlockingDequeAsync<V> {
/**
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available tail element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException;
}

@ -0,0 +1,72 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.Future;
/**
* Async interface for {@link BlockingDeque} backed by Redis
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RBlockingDequeAsync<V> extends RDequeAsync<V>, RBlockingQueueAsync<V> {
/**
* Retrieves and removes first available head element of <b>any</b> queue in async mode,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Future<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
/**
* Retrieves and removes first available tail element of <b>any</b> queue in async mode,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Future<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
Future<Void> putFirstAsync(V e);
Future<Void> putLastAsync(V e);
Future<V> pollLastAsync(long timeout, TimeUnit unit);
Future<V> takeLastAsync();
Future<V> pollFirstAsync(long timeout, TimeUnit unit);
Future<V> takeFirstAsync();
}

@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit;
public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockingQueueAsync<V> {
/**
* Retrieves and removes first available head of <b>any</b> queue, waiting up to the
* specified wait time if necessary for an element to become available
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
@ -43,6 +43,4 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException;
V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit) throws InterruptedException;
}

@ -29,8 +29,8 @@ import io.netty.util.concurrent.Future;
public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
/**
* Retrieves and removes first available head of <b>any</b> queue in async mode, waiting up to the
* specified wait time if necessary for an element to become available
* Retrieves and removes first available head element of <b>any</b> queue in async mode,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of

@ -0,0 +1,182 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBlockingDeque;
public class RedissonBlockingDequeTest extends BaseTest {
@Test
public void testPollLastFromAny() throws InterruptedException {
final RBlockingDeque<Integer> queue1 = redisson.getBlockingDeque("deque:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDeque<Integer> queue2 = redisson.getBlockingDeque("deque:pollany1");
RBlockingDeque<Integer> queue3 = redisson.getBlockingDeque("deque:pollany2");
try {
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (InterruptedException e) {
Assert.fail();
}
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "deque:pollany1", "deque:pollany2");
assertThat(l).isEqualTo(2);
assertThat(System.currentTimeMillis() - s).isGreaterThan(2000);
}
@Test
public void testFirstLast() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("deque");
deque.putFirst(1);
deque.putFirst(2);
deque.putLast(3);
deque.putLast(4);
assertThat(deque).containsExactly(2, 1, 3, 4);
}
@Test
public void testOfferFirstLast() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("deque");
deque.offerFirst(1);
deque.offerFirst(2);
deque.offerLast(3);
deque.offerLast(4);
assertThat(deque).containsExactly(2, 1, 3, 4);
}
@Test
public void testTakeFirst() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
deque.offerFirst(1);
deque.offerFirst(2);
deque.offerLast(3);
deque.offerLast(4);
assertThat(deque.takeFirst()).isEqualTo(2);
assertThat(deque.takeFirst()).isEqualTo(1);
assertThat(deque.takeFirst()).isEqualTo(3);
assertThat(deque.takeFirst()).isEqualTo(4);
assertThat(deque.size()).isZero();
}
@Test
public void testTakeLast() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
deque.offerFirst(1);
deque.offerFirst(2);
deque.offerLast(3);
deque.offerLast(4);
assertThat(deque.takeLast()).isEqualTo(4);
assertThat(deque.takeLast()).isEqualTo(3);
assertThat(deque.takeLast()).isEqualTo(1);
assertThat(deque.takeLast()).isEqualTo(2);
assertThat(deque.size()).isZero();
}
@Test
public void testTakeFirstAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
try {
deque.putFirst(1);
deque.putFirst(2);
deque.putLast(3);
deque.putLast(4);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(deque.takeFirst()).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(deque.takeFirst()).isEqualTo(2);
assertThat(deque.takeFirst()).isEqualTo(3);
assertThat(deque.takeFirst()).isEqualTo(4);
}
@Test
public void testTakeLastAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("queue:take");
try {
deque.putFirst(1);
deque.putFirst(2);
deque.putLast(3);
deque.putLast(4);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(deque.takeLast()).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(deque.takeLast()).isEqualTo(4);
assertThat(deque.takeLast()).isEqualTo(3);
assertThat(deque.takeLast()).isEqualTo(2);
}
@Test
public void testPollFirst() throws InterruptedException {
RBlockingDeque<Integer> queue1 = redisson.getBlockingDeque("queue1");
queue1.put(1);
queue1.put(2);
queue1.put(3);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(2);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(3);
long s = System.currentTimeMillis();
assertThat(queue1.pollFirst(5, TimeUnit.SECONDS)).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
@Test
public void testPollLast() throws InterruptedException {
RBlockingDeque<Integer> queue1 = redisson.getBlockingDeque("queue1");
queue1.putLast(1);
queue1.putLast(2);
queue1.putLast(3);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(3);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(2);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(1);
long s = System.currentTimeMillis();
assertThat(queue1.pollLast(5, TimeUnit.SECONDS)).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
}

@ -70,8 +70,6 @@ public class RedissonBlockingQueueTest extends BaseTest {
Assert.assertEquals(3, l);
Assert.assertTrue(System.currentTimeMillis() - s > 9000);
}
@Test
@ -92,18 +90,19 @@ public class RedissonBlockingQueueTest extends BaseTest {
Assert.assertEquals((Integer)1, queue1.poll(10, TimeUnit.SECONDS));
}
@Test public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("{queue}1");
queue1.put(1);
queue1.put(2);
queue1.put(3);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue2");
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2");
queue2.put(4);
queue2.put(5);
queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2, 10, TimeUnit.SECONDS);
queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS);
MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6));
}
@ -129,7 +128,7 @@ public class RedissonBlockingQueueTest extends BaseTest {
queue.add(3);
queue.offer(4);
//MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4));
MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4));
Assert.assertEquals((Integer) 1, queue.poll());
MatcherAssert.assertThat(queue, Matchers.contains(2, 3, 4));
Assert.assertEquals((Integer) 2, queue.element());

Loading…
Cancel
Save