RQueue.pollLastAndOfferFirstTo method added. #148

pull/150/head
Nikita 10 years ago
parent 5fa95d2953
commit 3d9a1e4a84

@ -1262,6 +1262,19 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
}
return cmd.getNow();
}
public <T> T awaitInterruptibly(Future<T> cmd, long timeout, TimeUnit unit) throws InterruptedException {
if (!cmd.await(timeout, unit)) {
Promise<T> promise = (Promise<T>)cmd;
RedisTimeoutException ex = new RedisTimeoutException();
promise.setFailure(ex);
throw ex;
}
if (!cmd.isSuccess()) {
throw (RedisException) cmd.cause();
}
return cmd.getNow();
}
public RedisClient getRedisClient() {
return redisClient;

@ -97,9 +97,9 @@ public class RedisConnection<K, V> {
return await(c.bitopXor(destination, keys));
}
public KeyValue<K, V> blpop(long timeout, K... keys) {
public KeyValue<K, V> blpop(long timeout, K... keys) throws InterruptedException {
long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout)));
return await(c.blpop(timeout, keys), timeout2, SECONDS);
return awaitInterruptibly(c.blpop(timeout, keys), timeout2, SECONDS);
}
public KeyValue<K, V> brpop(long timeout, K... keys) {
@ -107,9 +107,9 @@ public class RedisConnection<K, V> {
return await(c.brpop(timeout, keys), timeout2, SECONDS);
}
public V brpoplpush(long timeout, K source, K destination) {
public V brpoplpush(long timeout, K source, K destination) throws InterruptedException {
long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout)));
return await(c.brpoplpush(timeout, source, destination), timeout2, SECONDS);
return awaitInterruptibly(c.brpoplpush(timeout, source, destination), timeout2, SECONDS);
}
public K clientGetname() {
@ -859,6 +859,11 @@ public class RedisConnection<K, V> {
private <T> T await(Future<T> future, long timeout, TimeUnit unit) {
return c.await(future, timeout, unit);
}
private <T> T awaitInterruptibly(Future<T> future, long timeout, TimeUnit unit) throws InterruptedException {
return c.awaitInterruptibly(future, timeout, unit);
}
private <T> T await(Future<T> future) {
return c.await(future, timeout, unit);

@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RBlockingQueue;
@ -55,9 +56,9 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public V take() throws InterruptedException {
return connectionManager.write(getName(), new SyncOperation<V, V>() {
return connectionManager.write(getName(), new SyncInterruptedOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
public V execute(RedisConnection<Object, V> conn) throws InterruptedException {
return conn.blpop(0, getName()).value;
}
});
@ -65,14 +66,30 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public V poll(final long timeout, final TimeUnit unit) throws InterruptedException {
return connectionManager.write(getName(), new SyncOperation<V, V>() {
return connectionManager.write(getName(), new SyncInterruptedOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
public V execute(RedisConnection<Object, V> conn) throws InterruptedException {
return conn.blpop(unit.toSeconds(timeout), getName()).value;
}
});
}
@Override
public V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit)
throws InterruptedException {
return pollLastAndOfferFirstTo(queue.getName(), timeout, unit);
}
@Override
public V pollLastAndOfferFirstTo(final String queueName, final long timeout, final TimeUnit unit) throws InterruptedException {
return connectionManager.write(getName(), new SyncInterruptedOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) throws InterruptedException {
return conn.brpoplpush(unit.toSeconds(timeout), getName(), queueName);
}
});
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;

@ -92,4 +92,19 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
return get(0);
}
@Override
public V pollLastAndOfferFirstTo(final String queueName) {
return connectionManager.write(new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpoplpush(getName(), queueName);
}
});
}
@Override
public V pollLastAndOfferFirstTo(RQueue<V> queue) {
return pollLastAndOfferFirstTo(queue.getName());
}
}

@ -0,0 +1,24 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.async;
import com.lambdaworks.redis.RedisConnection;
public interface SyncInterruptedOperation<V, R> {
R execute(RedisConnection<Object, V> conn) throws InterruptedException;
}

@ -19,6 +19,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import org.redisson.async.AsyncOperation;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import com.lambdaworks.redis.RedisConnection;
@ -37,7 +38,11 @@ public interface ConnectionManager {
<V, R> R read(String key, SyncOperation<V, R> operation);
<V, R> R read(SyncOperation<V, R> operation);
<V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(String key, SyncOperation<V, R> operation);
<V, R> R write(SyncOperation<V, R> operation);

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.async.AsyncOperation;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
@ -257,6 +258,47 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
});
}
public <V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException {
int slot = calcSlot(key);
return write(slot, operation, 0);
}
public <V, R> R write(SyncInterruptedOperation<V, R> operation) throws InterruptedException {
return write(-1, operation, 0);
}
private <V, R> R write(int slot, SyncInterruptedOperation<V, R> operation, int attempt) throws InterruptedException {
try {
RedisConnection<Object, V> connection = connectionWriteOp(slot);
try {
return operation.execute(connection);
} catch (RedisMovedException e) {
return write(e.getSlot(), operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == config.getRetryAttempts()) {
throw e;
}
attempt++;
return write(slot, operation, attempt);
} catch (InterruptedException e) {
throw e;
} finally {
releaseWrite(slot, connection);
}
} catch (RedisConnectionException e) {
if (attempt == config.getRetryAttempts()) {
throw e;
}
try {
Thread.sleep(config.getRetryInterval());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
attempt++;
return write(slot, operation, attempt);
}
}
public <V, R> R write(String key, SyncOperation<V, R> operation) {
int slot = calcSlot(key);

@ -25,4 +25,8 @@ import java.util.concurrent.*;
*/
public interface RBlockingQueue<V> extends BlockingQueue<V>, RExpirable {
V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException;
V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit) throws InterruptedException;
}

@ -24,6 +24,6 @@ import java.util.Deque;
*
* @param <V> the type of elements held in this collection
*/
public interface RDeque<V> extends Deque<V>, RExpirable {
public interface RDeque<V> extends Deque<V>, RQueue<V> {
}

@ -26,4 +26,8 @@ import java.util.Queue;
*/
public interface RQueue<V> extends Queue<V>, RExpirable {
V pollLastAndOfferFirstTo(String dequeName);
V pollLastAndOfferFirstTo(RQueue<V> deque);
}

@ -15,6 +15,22 @@ import org.redisson.core.*;
public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue1");
queue1.put(1);
queue1.put(2);
queue1.put(3);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue2");
queue2.put(4);
queue2.put(5);
queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2, 10, TimeUnit.SECONDS);
MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6));
}
@Test
public void testAddOfferOrigin() {
Queue<Integer> queue = new LinkedList<Integer>();

@ -13,6 +13,22 @@ import org.redisson.core.RDeque;
public class RedissonDequeTest extends BaseTest {
@Test
public void testPollLastAndOfferFirstTo() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
queue1.addFirst(3);
queue1.addFirst(2);
queue1.addFirst(1);
RDeque<Integer> queue2 = redisson.getDeque("deque2");
queue2.addFirst(6);
queue2.addFirst(5);
queue2.addFirst(4);
queue1.pollLastAndOfferFirstTo(queue2);
MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6));
}
@Test
public void testAddFirstOrigin() {
Deque<Integer> queue = new ArrayDeque<Integer>();

Loading…
Cancel
Save