poll npe fixed. #181

pull/189/head
Nikita 10 years ago
parent 3bea04edc1
commit 88206c2ab5

@ -28,6 +28,7 @@ import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
import com.lambdaworks.redis.KeyValue;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
import org.redisson.core.RScript; import org.redisson.core.RScript;
@ -71,7 +72,11 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return connectionManager.write(getName(), new SyncInterruptedOperation<V, V>() { return connectionManager.write(getName(), new SyncInterruptedOperation<V, V>() {
@Override @Override
public V execute(RedisConnection<Object, V> conn) throws InterruptedException { public V execute(RedisConnection<Object, V> conn) throws InterruptedException {
return conn.blpop(unit.toSeconds(timeout), getName()).value; KeyValue<Object, V> val = conn.blpop(unit.toSeconds(timeout), getName());
if (val != null) {
return val.value;
}
return null;
} }
}); });
} }

@ -15,6 +15,14 @@ import org.redisson.core.*;
public class RedissonBlockingQueueTest extends BaseTest { public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPoll() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
Assert.assertEquals((Integer)1, queue1.poll(2, TimeUnit.SECONDS));
Assert.assertNull(queue1.poll(2, TimeUnit.SECONDS));
}
@Test @Test
public void testPollLastAndOfferFirstTo() throws InterruptedException { public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");

Loading…
Cancel
Save