From 3d9a1e4a84afd7f5c56021bed261402168bfe415 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 24 Apr 2015 17:08:51 +0300 Subject: [PATCH] RQueue.pollLastAndOfferFirstTo method added. #148 --- .../redis/RedisAsyncConnection.java | 13 ++++++ .../lambdaworks/redis/RedisConnection.java | 13 ++++-- .../org/redisson/RedissonBlockingQueue.java | 25 +++++++++-- src/main/java/org/redisson/RedissonQueue.java | 15 +++++++ .../async/SyncInterruptedOperation.java | 24 +++++++++++ .../connection/ConnectionManager.java | 7 +++- .../MasterSlaveConnectionManager.java | 42 +++++++++++++++++++ .../org/redisson/core/RBlockingQueue.java | 4 ++ src/main/java/org/redisson/core/RDeque.java | 2 +- src/main/java/org/redisson/core/RQueue.java | 4 ++ .../redisson/RedissonBlockingQueueTest.java | 16 +++++++ .../java/org/redisson/RedissonDequeTest.java | 16 +++++++ 12 files changed, 171 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/redisson/async/SyncInterruptedOperation.java diff --git a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java index e7b7a53fc..f34a6e8ee 100644 --- a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java @@ -1262,6 +1262,19 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { } return cmd.getNow(); } + + public T awaitInterruptibly(Future cmd, long timeout, TimeUnit unit) throws InterruptedException { + if (!cmd.await(timeout, unit)) { + Promise promise = (Promise)cmd; + RedisTimeoutException ex = new RedisTimeoutException(); + promise.setFailure(ex); + throw ex; + } + if (!cmd.isSuccess()) { + throw (RedisException) cmd.cause(); + } + return cmd.getNow(); + } public RedisClient getRedisClient() { return redisClient; diff --git a/src/main/java/com/lambdaworks/redis/RedisConnection.java b/src/main/java/com/lambdaworks/redis/RedisConnection.java index 25787fc53..3ee1927c8 100644 --- a/src/main/java/com/lambdaworks/redis/RedisConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisConnection.java @@ -97,9 +97,9 @@ public class RedisConnection { return await(c.bitopXor(destination, keys)); } - public KeyValue blpop(long timeout, K... keys) { + public KeyValue blpop(long timeout, K... keys) throws InterruptedException { long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout))); - return await(c.blpop(timeout, keys), timeout2, SECONDS); + return awaitInterruptibly(c.blpop(timeout, keys), timeout2, SECONDS); } public KeyValue brpop(long timeout, K... keys) { @@ -107,9 +107,9 @@ public class RedisConnection { return await(c.brpop(timeout, keys), timeout2, SECONDS); } - public V brpoplpush(long timeout, K source, K destination) { + public V brpoplpush(long timeout, K source, K destination) throws InterruptedException { long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout))); - return await(c.brpoplpush(timeout, source, destination), timeout2, SECONDS); + return awaitInterruptibly(c.brpoplpush(timeout, source, destination), timeout2, SECONDS); } public K clientGetname() { @@ -859,6 +859,11 @@ public class RedisConnection { private T await(Future future, long timeout, TimeUnit unit) { return c.await(future, timeout, unit); } + + private T awaitInterruptibly(Future future, long timeout, TimeUnit unit) throws InterruptedException { + return c.awaitInterruptibly(future, timeout, unit); + } + private T await(Future future) { return c.await(future, timeout, unit); diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index 7399fb201..c8840a71f 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; import org.redisson.connection.ConnectionManager; import org.redisson.core.RBlockingQueue; @@ -55,9 +56,9 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public V take() throws InterruptedException { - return connectionManager.write(getName(), new SyncOperation() { + return connectionManager.write(getName(), new SyncInterruptedOperation() { @Override - public V execute(RedisConnection conn) { + public V execute(RedisConnection conn) throws InterruptedException { return conn.blpop(0, getName()).value; } }); @@ -65,14 +66,30 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public V poll(final long timeout, final TimeUnit unit) throws InterruptedException { - return connectionManager.write(getName(), new SyncOperation() { + return connectionManager.write(getName(), new SyncInterruptedOperation() { @Override - public V execute(RedisConnection conn) { + public V execute(RedisConnection conn) throws InterruptedException { return conn.blpop(unit.toSeconds(timeout), getName()).value; } }); } + @Override + public V pollLastAndOfferFirstTo(RBlockingQueue queue, long timeout, TimeUnit unit) + throws InterruptedException { + return pollLastAndOfferFirstTo(queue.getName(), timeout, unit); + } + + @Override + public V pollLastAndOfferFirstTo(final String queueName, final long timeout, final TimeUnit unit) throws InterruptedException { + return connectionManager.write(getName(), new SyncInterruptedOperation() { + @Override + public V execute(RedisConnection conn) throws InterruptedException { + return conn.brpoplpush(unit.toSeconds(timeout), getName(), queueName); + } + }); + } + @Override public int remainingCapacity() { return Integer.MAX_VALUE; diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index 8f9cf74ab..e6f36a830 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -92,4 +92,19 @@ public class RedissonQueue extends RedissonList implements RQueue { return get(0); } + @Override + public V pollLastAndOfferFirstTo(final String queueName) { + return connectionManager.write(new ResultOperation() { + @Override + protected Future execute(RedisAsyncConnection async) { + return async.rpoplpush(getName(), queueName); + } + }); + } + + @Override + public V pollLastAndOfferFirstTo(RQueue queue) { + return pollLastAndOfferFirstTo(queue.getName()); + } + } diff --git a/src/main/java/org/redisson/async/SyncInterruptedOperation.java b/src/main/java/org/redisson/async/SyncInterruptedOperation.java new file mode 100644 index 000000000..3a36a4265 --- /dev/null +++ b/src/main/java/org/redisson/async/SyncInterruptedOperation.java @@ -0,0 +1,24 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.async; + +import com.lambdaworks.redis.RedisConnection; + +public interface SyncInterruptedOperation { + + R execute(RedisConnection conn) throws InterruptedException; + +} diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 25f731585..e1eb2eca1 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -19,6 +19,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; import org.redisson.async.AsyncOperation; +import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; import com.lambdaworks.redis.RedisConnection; @@ -37,7 +38,11 @@ public interface ConnectionManager { R read(String key, SyncOperation operation); R read(SyncOperation operation); - + + R write(String key, SyncInterruptedOperation operation) throws InterruptedException; + + R write(SyncInterruptedOperation operation) throws InterruptedException; + R write(String key, SyncOperation operation); R write(SyncOperation operation); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 3b36c0ed5..f9f3c6ef5 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.Config; import org.redisson.MasterSlaveServersConfig; import org.redisson.async.AsyncOperation; +import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; import org.redisson.codec.RedisCodecWrapper; import org.slf4j.Logger; @@ -257,6 +258,47 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } }); } + + public R write(String key, SyncInterruptedOperation operation) throws InterruptedException { + int slot = calcSlot(key); + return write(slot, operation, 0); + } + + public R write(SyncInterruptedOperation operation) throws InterruptedException { + return write(-1, operation, 0); + } + + private R write(int slot, SyncInterruptedOperation operation, int attempt) throws InterruptedException { + try { + RedisConnection connection = connectionWriteOp(slot); + try { + return operation.execute(connection); + } catch (RedisMovedException e) { + return write(e.getSlot(), operation, attempt); + } catch (RedisTimeoutException e) { + if (attempt == config.getRetryAttempts()) { + throw e; + } + attempt++; + return write(slot, operation, attempt); + } catch (InterruptedException e) { + throw e; + } finally { + releaseWrite(slot, connection); + } + } catch (RedisConnectionException e) { + if (attempt == config.getRetryAttempts()) { + throw e; + } + try { + Thread.sleep(config.getRetryInterval()); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + attempt++; + return write(slot, operation, attempt); + } + } public R write(String key, SyncOperation operation) { int slot = calcSlot(key); diff --git a/src/main/java/org/redisson/core/RBlockingQueue.java b/src/main/java/org/redisson/core/RBlockingQueue.java index b8a534517..6c88bfb0d 100644 --- a/src/main/java/org/redisson/core/RBlockingQueue.java +++ b/src/main/java/org/redisson/core/RBlockingQueue.java @@ -25,4 +25,8 @@ import java.util.concurrent.*; */ public interface RBlockingQueue extends BlockingQueue, RExpirable { + V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException; + + V pollLastAndOfferFirstTo(RBlockingQueue queue, long timeout, TimeUnit unit) throws InterruptedException; + } diff --git a/src/main/java/org/redisson/core/RDeque.java b/src/main/java/org/redisson/core/RDeque.java index 88abbb521..ceefbc2b0 100644 --- a/src/main/java/org/redisson/core/RDeque.java +++ b/src/main/java/org/redisson/core/RDeque.java @@ -24,6 +24,6 @@ import java.util.Deque; * * @param the type of elements held in this collection */ -public interface RDeque extends Deque, RExpirable { +public interface RDeque extends Deque, RQueue { } diff --git a/src/main/java/org/redisson/core/RQueue.java b/src/main/java/org/redisson/core/RQueue.java index 4dd62ed32..201409010 100644 --- a/src/main/java/org/redisson/core/RQueue.java +++ b/src/main/java/org/redisson/core/RQueue.java @@ -26,4 +26,8 @@ import java.util.Queue; */ public interface RQueue extends Queue, RExpirable { + V pollLastAndOfferFirstTo(String dequeName); + + V pollLastAndOfferFirstTo(RQueue deque); + } diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 421c14562..3c39d7490 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -15,6 +15,22 @@ import org.redisson.core.*; public class RedissonBlockingQueueTest extends BaseTest { + @Test + public void testPollLastAndOfferFirstTo() throws InterruptedException { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue1"); + queue1.put(1); + queue1.put(2); + queue1.put(3); + + RBlockingQueue queue2 = redisson.getBlockingQueue("queue2"); + queue2.put(4); + queue2.put(5); + queue2.put(6); + + queue1.pollLastAndOfferFirstTo(queue2, 10, TimeUnit.SECONDS); + MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6)); + } + @Test public void testAddOfferOrigin() { Queue queue = new LinkedList(); diff --git a/src/test/java/org/redisson/RedissonDequeTest.java b/src/test/java/org/redisson/RedissonDequeTest.java index 59476aed1..b02cbb34e 100644 --- a/src/test/java/org/redisson/RedissonDequeTest.java +++ b/src/test/java/org/redisson/RedissonDequeTest.java @@ -13,6 +13,22 @@ import org.redisson.core.RDeque; public class RedissonDequeTest extends BaseTest { + @Test + public void testPollLastAndOfferFirstTo() { + RDeque queue1 = redisson.getDeque("deque1"); + queue1.addFirst(3); + queue1.addFirst(2); + queue1.addFirst(1); + + RDeque queue2 = redisson.getDeque("deque2"); + queue2.addFirst(6); + queue2.addFirst(5); + queue2.addFirst(4); + + queue1.pollLastAndOfferFirstTo(queue2); + MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6)); + } + @Test public void testAddFirstOrigin() { Deque queue = new ArrayDeque();