From 5f2185beeceb04277ec8f7d47c0f02d06047fad8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 17 Jul 2015 14:50:37 +0300 Subject: [PATCH] RQueueAsync interface added. #186 --- src/main/java/org/redisson/RedissonQueue.java | 17 +++++++- .../org/redisson/core/RCollectionAsync.java | 26 +++++++++++++ .../java/org/redisson/core/RListAsync.java | 21 +--------- src/main/java/org/redisson/core/RQueue.java | 8 +--- .../java/org/redisson/core/RQueueAsync.java | 39 +++++++++++++++++++ 5 files changed, 83 insertions(+), 28 deletions(-) create mode 100644 src/main/java/org/redisson/core/RCollectionAsync.java create mode 100644 src/main/java/org/redisson/core/RQueueAsync.java diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index d7e4b353f..bdbf94bee 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -41,6 +41,11 @@ public class RedissonQueue extends RedissonList implements RQueue { return add(e); } + @Override + public Future offerAsync(V e) { + return addAsync(e); + } + public V getFirst() { V value = connectionManager.read(getName(), RedisCommands.LINDEX, getName(), 0); if (value == null) { @@ -62,9 +67,14 @@ public class RedissonQueue extends RedissonList implements RQueue { return removeFirst(); } + @Override + public Future pollAsync() { + return connectionManager.writeAsync(getName(), RedisCommands.LPOP, getName()); + } + @Override public V poll() { - return connectionManager.write(getName(), RedisCommands.LPOP, getName()); + return connectionManager.get(pollAsync()); } @Override @@ -72,6 +82,11 @@ public class RedissonQueue extends RedissonList implements RQueue { return getFirst(); } + @Override + public Future peekAsync() { + return getAsync(0); + } + @Override public V peek() { if (isEmpty()) { diff --git a/src/main/java/org/redisson/core/RCollectionAsync.java b/src/main/java/org/redisson/core/RCollectionAsync.java new file mode 100644 index 000000000..8d4137d70 --- /dev/null +++ b/src/main/java/org/redisson/core/RCollectionAsync.java @@ -0,0 +1,26 @@ +package org.redisson.core; + +import java.util.Collection; +import java.util.List; + +import io.netty.util.concurrent.Future; + +public interface RCollectionAsync extends RExpirableAsync { + + Future retainAllAsync(Collection c); + + Future removeAllAsync(Collection c); + + Future containsAllAsync(Collection c); + + Future removeAsync(Object o); + + Future> readAllAsync(); + + Future sizeAsync(); + + Future addAsync(V e); + + Future addAllAsync(Collection c); + +} diff --git a/src/main/java/org/redisson/core/RListAsync.java b/src/main/java/org/redisson/core/RListAsync.java index 571ad20ef..a781d4507 100644 --- a/src/main/java/org/redisson/core/RListAsync.java +++ b/src/main/java/org/redisson/core/RListAsync.java @@ -17,9 +17,6 @@ package org.redisson.core; import io.netty.util.concurrent.Future; -import java.util.Collection; -import java.util.List; - /** * Async list functions * @@ -27,7 +24,7 @@ import java.util.List; * * @param the type of elements held in this collection */ -public interface RListAsync extends RExpirableAsync { +public interface RListAsync extends RCollectionAsync { Future lastIndexOfAsync(Object o); @@ -37,22 +34,6 @@ public interface RListAsync extends RExpirableAsync { Future setAsync(int index, V element); - Future retainAllAsync(Collection c); - - Future removeAllAsync(Collection c); - - Future containsAllAsync(Collection c); - - Future removeAsync(Object o); - - Future> readAllAsync(); - - Future sizeAsync(); - Future getAsync(int index); - Future addAsync(V e); - - Future addAllAsync(Collection c); - } diff --git a/src/main/java/org/redisson/core/RQueue.java b/src/main/java/org/redisson/core/RQueue.java index 8d8ab10f8..bd8708e97 100644 --- a/src/main/java/org/redisson/core/RQueue.java +++ b/src/main/java/org/redisson/core/RQueue.java @@ -17,8 +17,6 @@ package org.redisson.core; import java.util.Queue; -import io.netty.util.concurrent.Future; - /** * {@link java.util.Queue} backed by Redis * @@ -26,11 +24,7 @@ import io.netty.util.concurrent.Future; * * @param the type of elements held in this collection */ -public interface RQueue extends Queue, RExpirable { - - Future pollLastAndOfferFirstToAsync(RQueue queue); - - Future pollLastAndOfferFirstToAsync(String queueName); +public interface RQueue extends Queue, RExpirable, RQueueAsync { V pollLastAndOfferFirstTo(String dequeName); diff --git a/src/main/java/org/redisson/core/RQueueAsync.java b/src/main/java/org/redisson/core/RQueueAsync.java new file mode 100644 index 000000000..bac65cca7 --- /dev/null +++ b/src/main/java/org/redisson/core/RQueueAsync.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import io.netty.util.concurrent.Future; + +/** + * {@link java.util.Queue} backed by Redis + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public interface RQueueAsync extends RCollectionAsync { + + Future peekAsync(); + + Future pollAsync(); + + Future offerAsync(V e); + + Future pollLastAndOfferFirstToAsync(RQueue queue); + + Future pollLastAndOfferFirstToAsync(String queueName); + +}