From a400717b126d388225037873e4414026683c02d6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 14 Mar 2015 12:36:04 +0300 Subject: [PATCH] RedissonQueue refactoring. #133 --- .../org/redisson/RedissonBlockingQueue.java | 122 ++++++++++++------ src/main/java/org/redisson/RedissonList.java | 14 +- .../org/redisson/core/RBlockingQueue.java | 15 +++ .../redisson/RedissonBlockingQueueTest.java | 3 - 4 files changed, 109 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index 79faea57e..b2318b8b7 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -1,13 +1,32 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson; -import java.util.*; -import java.util.concurrent.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; -import org.redisson.async.*; -import org.redisson.connection.*; -import org.redisson.core.*; +import org.redisson.async.SyncOperation; +import org.redisson.connection.ConnectionManager; +import org.redisson.core.RBlockingQueue; -import com.lambdaworks.redis.*; +import com.lambdaworks.redis.RedisConnection; /** * Offers blocking queue facilities through an intermediary @@ -16,38 +35,12 @@ import com.lambdaworks.redis.*; * delegated to this intermediary queue. * * @author pdeschen@gmail.com + * @author Nikita Koksharov */ public class RedissonBlockingQueue extends RedissonQueue implements RBlockingQueue { - private final static int BLPOP_TIMEOUT_IN_MS = 1000; - - private final LinkedBlockingQueue blockingQueue = new LinkedBlockingQueue(); - - public RedissonBlockingQueue(final ConnectionManager connection, final String name) { + protected RedissonBlockingQueue(ConnectionManager connection, String name) { super(connection, name); - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - final java.util.concurrent.Future future = executor.submit(new Runnable() { - @Override - public void run() { - while (true) { - V item = connection.write(name, new SyncOperation() { - @Override - public V execute(RedisConnection conn) { - // Get this timeout from config? - return conn.blpop(BLPOP_TIMEOUT_IN_MS, name).value; - } - }); - blockingQueue.add(item); - } - } - }); - Runtime.getRuntime().addShutdownHook(new Thread("redis-blpop-shutdown-hook-thread") { - @Override - public void run() { - future.cancel(true); - executor.shutdown(); - } - }); } @Override @@ -62,26 +55,75 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public V take() throws InterruptedException { - return blockingQueue.take(); + return connectionManager.write(getName(), new SyncOperation() { + @Override + public V execute(RedisConnection conn) { + return conn.blpop(0, getName()).value; + } + }); } @Override public V poll(final long timeout, final TimeUnit unit) throws InterruptedException { - return blockingQueue.poll(timeout, unit); + return connectionManager.read(getName(), new SyncOperation() { + @Override + public V execute(RedisConnection conn) { + return conn.blpop(unit.toSeconds(timeout), getName()).value; + } + }); } @Override public int remainingCapacity() { - return blockingQueue.remainingCapacity(); + return Integer.MAX_VALUE; } @Override public int drainTo(Collection c) { - return blockingQueue.drainTo(c); + List list = connectionManager.write(getName(), new SyncOperation>() { + @Override + public List execute(RedisConnection conn) { + while (true) { + conn.watch(getName()); + conn.multi(); + conn.lrange(getName(), 0, -1); + conn.ltrim(getName(), 0, -1); + List res = conn.exec(); + if (res.size() == 2) { + List items = (List) res.get(0); + return items; + } + } + } + }); + c.addAll(list); + return list.size(); } @Override - public int drainTo(Collection c, int maxElements) { - return blockingQueue.drainTo(c, maxElements); + public int drainTo(Collection c, final int maxElements) { + List list = connectionManager.write(getName(), new SyncOperation>() { + @Override + public List execute(RedisConnection conn) { + while (true) { + conn.watch(getName()); + Long len = Math.min(conn.llen(getName()), maxElements); + if (len == 0) { + conn.unwatch(); + return Collections.emptyList(); + } + conn.multi(); + conn.lrange(getName(), 0, len); + conn.ltrim(getName(), 0, len); + List res = conn.exec(); + if (res.size() == 2) { + List items = (List) res.get(0); + return items; + } + } + } + }); + c.addAll(list); + return list.size(); } } \ No newline at end of file diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 54d88ff06..d34274fe7 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -75,13 +75,23 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Object[] toArray() { - List list = subList(0, size()); + List list = readAllList(); return list.toArray(); } + protected List readAllList() { + List list = connectionManager.read(getName(), new ResultOperation, V>() { + @Override + protected Future> execute(RedisAsyncConnection async) { + return async.lrange(getName(), 0, -1); + } + }); + return list; + } + @Override public T[] toArray(T[] a) { - List list = subList(0, size()); + List list = readAllList(); return list.toArray(a); } diff --git a/src/main/java/org/redisson/core/RBlockingQueue.java b/src/main/java/org/redisson/core/RBlockingQueue.java index b7ae5f32e..b8a534517 100644 --- a/src/main/java/org/redisson/core/RBlockingQueue.java +++ b/src/main/java/org/redisson/core/RBlockingQueue.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.core; import java.util.concurrent.*; diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index ea9ad77cd..421c14562 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -117,8 +117,5 @@ public class RedissonBlockingQueueTest extends BaseTest { assertThat(counter.get(), equalTo(total)); queue.delete(); - - redisson.shutdown(); - } }