RedissonQueue refactoring. #133

pull/139/head
Nikita 10 years ago
parent ee4044afaf
commit a400717b12

@ -1,13 +1,32 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson; package org.redisson;
import java.util.*; import java.util.Collection;
import java.util.concurrent.*; import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.async.*; import org.redisson.async.SyncOperation;
import org.redisson.connection.*; import org.redisson.connection.ConnectionManager;
import org.redisson.core.*; import org.redisson.core.RBlockingQueue;
import com.lambdaworks.redis.*; import com.lambdaworks.redis.RedisConnection;
/** /**
* Offers blocking queue facilities through an intermediary * Offers blocking queue facilities through an intermediary
@ -16,38 +35,12 @@ import com.lambdaworks.redis.*;
* delegated to this intermediary queue. * delegated to this intermediary queue.
* *
* @author pdeschen@gmail.com * @author pdeschen@gmail.com
* @author Nikita Koksharov
*/ */
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> { public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
private final static int BLPOP_TIMEOUT_IN_MS = 1000; protected RedissonBlockingQueue(ConnectionManager connection, String name) {
private final LinkedBlockingQueue<V> blockingQueue = new LinkedBlockingQueue();
public RedissonBlockingQueue(final ConnectionManager connection, final String name) {
super(connection, name); super(connection, name);
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
final java.util.concurrent.Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
while (true) {
V item = connection.write(name, new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
// Get this timeout from config?
return conn.blpop(BLPOP_TIMEOUT_IN_MS, name).value;
}
});
blockingQueue.add(item);
}
}
});
Runtime.getRuntime().addShutdownHook(new Thread("redis-blpop-shutdown-hook-thread") {
@Override
public void run() {
future.cancel(true);
executor.shutdown();
}
});
} }
@Override @Override
@ -62,26 +55,75 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override @Override
public V take() throws InterruptedException { public V take() throws InterruptedException {
return blockingQueue.take(); return connectionManager.write(getName(), new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
return conn.blpop(0, getName()).value;
}
});
} }
@Override @Override
public V poll(final long timeout, final TimeUnit unit) throws InterruptedException { public V poll(final long timeout, final TimeUnit unit) throws InterruptedException {
return blockingQueue.poll(timeout, unit); return connectionManager.read(getName(), new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
return conn.blpop(unit.toSeconds(timeout), getName()).value;
}
});
} }
@Override @Override
public int remainingCapacity() { public int remainingCapacity() {
return blockingQueue.remainingCapacity(); return Integer.MAX_VALUE;
} }
@Override @Override
public int drainTo(Collection<? super V> c) { public int drainTo(Collection<? super V> c) {
return blockingQueue.drainTo(c); List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
conn.multi();
conn.lrange(getName(), 0, -1);
conn.ltrim(getName(), 0, -1);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
c.addAll(list);
return list.size();
} }
@Override @Override
public int drainTo(Collection<? super V> c, int maxElements) { public int drainTo(Collection<? super V> c, final int maxElements) {
return blockingQueue.drainTo(c, maxElements); List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
Long len = Math.min(conn.llen(getName()), maxElements);
if (len == 0) {
conn.unwatch();
return Collections.emptyList();
}
conn.multi();
conn.lrange(getName(), 0, len);
conn.ltrim(getName(), 0, len);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
c.addAll(list);
return list.size();
} }
} }

@ -75,13 +75,23 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public Object[] toArray() { public Object[] toArray() {
List<V> list = subList(0, size()); List<V> list = readAllList();
return list.toArray(); return list.toArray();
} }
protected List<V> readAllList() {
List<V> list = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), 0, -1);
}
});
return list;
}
@Override @Override
public <T> T[] toArray(T[] a) { public <T> T[] toArray(T[] a) {
List<V> list = subList(0, size()); List<V> list = readAllList();
return list.toArray(a); return list.toArray(a);
} }

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core; package org.redisson.core;
import java.util.concurrent.*; import java.util.concurrent.*;

@ -117,8 +117,5 @@ public class RedissonBlockingQueueTest extends BaseTest {
assertThat(counter.get(), equalTo(total)); assertThat(counter.get(), equalTo(total));
queue.delete(); queue.delete();
redisson.shutdown();
} }
} }

Loading…
Cancel
Save