diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index e0b8ec867..7594e69e7 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -52,6 +52,7 @@ import org.redisson.api.RMap; import org.redisson.api.RMapCache; import org.redisson.api.RPatternTopic; import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RPriorityDeque; import org.redisson.api.RPriorityQueue; import org.redisson.api.RQueue; import org.redisson.api.RReadWriteLock; @@ -614,6 +615,17 @@ public class Redisson implements RedissonClient { public RPriorityQueue getPriorityQueue(String name, Codec codec) { return new RedissonPriorityQueue(codec, connectionManager.getCommandExecutor(), name, this); } + + @Override + public RPriorityDeque getPriorityDeque(String name) { + return new RedissonPriorityDeque(connectionManager.getCommandExecutor(), name, this); + } + + @Override + public RPriorityDeque getPriorityDeque(String name, Codec codec) { + return new RedissonPriorityDeque(codec, connectionManager.getCommandExecutor(), name, this); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java new file mode 100644 index 000000000..155b5ef10 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java @@ -0,0 +1,243 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.redisson.api.RFuture; +import org.redisson.api.RPriorityDeque; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; +import org.redisson.command.CommandExecutor; + +/** + * Distributed and concurrent implementation of {@link java.util.Queue} + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public class RedissonPriorityDeque extends RedissonPriorityQueue implements RPriorityDeque { + + private static final RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); + private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); + + + protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, Redisson redisson) { + super(commandExecutor, name, redisson); + } + + public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) { + super(codec, commandExecutor, name, redisson); + } + + @Override + public void addFirst(V e) { + get(addFirstAsync(e)); + } + +// @Override + public RFuture addFirstAsync(V e) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_VOID, getName(), e); + } + + @Override + public void addLast(V e) { + get(addLastAsync(e)); + } + +// @Override + public RFuture addLastAsync(V e) { + return commandExecutor.writeAsync(getName(), codec, RPUSH_VOID, getName(), e); + } + + + @Override + public Iterator descendingIterator() { + return new Iterator() { + + private int currentIndex = size(); + private boolean removeExecuted; + + @Override + public boolean hasNext() { + int size = size(); + return currentIndex > 0 && size > 0; + } + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No such element at index " + currentIndex); + } + currentIndex--; + removeExecuted = false; + return RedissonPriorityDeque.this.get(currentIndex); + } + + @Override + public void remove() { + if (removeExecuted) { + throw new IllegalStateException("Element been already deleted"); + } + RedissonPriorityDeque.this.remove(currentIndex); + currentIndex++; + removeExecuted = true; + } + + }; + } + +// @Override + public RFuture getLastAsync() { + return commandExecutor.readAsync(getName(), codec, LRANGE_SINGLE, getName(), -1, -1); + } + + @Override + public V getLast() { + V result = get(getLastAsync()); + if (result == null) { + throw new NoSuchElementException(); + } + return result; + } + + @Override + public boolean offerFirst(V e) { + return get(offerFirstAsync(e)); + } + +// @Override + public RFuture offerFirstAsync(V e) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_BOOLEAN, getName(), e); + } + +// @Override + public RFuture offerLastAsync(V e) { + return offerAsync(e); + } + + @Override + public boolean offerLast(V e) { + return get(offerLastAsync(e)); + } + +// @Override + public RFuture peekFirstAsync() { + return getAsync(0); + } + + @Override + public V peekFirst() { + return get(peekFirstAsync()); + } + +// @Override + public RFuture peekLastAsync() { + return getLastAsync(); + } + + @Override + public V peekLast() { + return get(getLastAsync()); + } + +// @Override + public RFuture pollFirstAsync() { + return pollAsync(); + } + + @Override + public V pollFirst() { + return poll(); + } + +// @Override + public RFuture pollLastAsync() { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName()); + } + + + @Override + public V pollLast() { + return get(pollLastAsync()); + } + +// @Override + public RFuture popAsync() { + return pollAsync(); + } + + @Override + public V pop() { + return removeFirst(); + } + +// @Override + public RFuture pushAsync(V e) { + return addFirstAsync(e); + } + + @Override + public void push(V e) { + addFirst(e); + } + +// @Override + public RFuture removeFirstOccurrenceAsync(Object o) { + return removeAsync(o, 1); + } + + @Override + public boolean removeFirstOccurrence(Object o) { + return remove(o, 1); + } + +// @Override + public RFuture removeFirstAsync() { + return pollAsync(); + } + +// @Override + public RFuture removeLastAsync() { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName()); + } + + @Override + public V removeLast() { + V value = get(removeLastAsync()); + if (value == null) { + throw new NoSuchElementException(); + } + return value; + } + +// @Override + public RFuture removeLastOccurrenceAsync(Object o) { + return removeAsync(o, -1); + } + + @Override + public boolean removeLastOccurrence(Object o) { + return remove(o, -1); + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RPriorityDeque.java b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java new file mode 100644 index 000000000..8d29f1eba --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Deque; + +/** + * + * @author Nikita Koksharov + * + * @param value type + */ +public interface RPriorityDeque extends Deque, RPriorityQueue { + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 82cf912c7..2845c5035 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -547,7 +547,6 @@ public interface RedissonClient { */ RQueue getQueue(String name, Codec codec); - /** * Returns priority unbounded queue instance by name. * It uses comparator to sort objects. @@ -570,6 +569,28 @@ public interface RedissonClient { */ RPriorityQueue getPriorityQueue(String name, Codec codec); + /** + * Returns priority unbounded deque instance by name. + * It uses comparator to sort objects. + * + * @param type of value + * @param name of object + * @return Queue object + */ + RPriorityDeque getPriorityDeque(String name); + + /** + * Returns priority unbounded deque instance by name + * using provided codec for queue objects. + * It uses comparator to sort objects. + * + * @param type of value + * @param name - name of object + * @param codec - codec for message + * @return Queue object + */ + RPriorityDeque getPriorityDeque(String name, Codec codec); + /** * Returns unbounded blocking queue instance by name. *