new POC for RedissonBlockingFairQueue

pull/748/head
Nikita 8 years ago
parent 29f1fe65eb
commit cad190152f

@ -428,12 +428,12 @@ public class Redisson implements RedissonClient {
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name) {
return new RedissonBlockingFairQueue<V>(commandExecutor, name, id);
return new RedissonBlockingFairQueue<V>(commandExecutor, name, semaphorePubSub, id);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec) {
return new RedissonBlockingFairQueue<V>(codec, commandExecutor, name, id);
return new RedissonBlockingFairQueue<V>(codec, commandExecutor, name, semaphorePubSub, id);
}
@Override

@ -15,18 +15,23 @@
*/
package org.redisson;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -35,298 +40,406 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {
private final RedissonFairLock fairLock;
private final Set<String> usedIds = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
private final UUID id;
private final SemaphorePubSub semaphorePubSub;
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, UUID id) {
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
}
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, UUID id) {
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(codec, commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
}
private String getIdsListName() {
return "{" + getName() + "}:list";
}
private String getChannelName() {
return "{" + getName() + "}:" + getCurrentId() + ":channel";
}
private RedissonLockEntry getEntry() {
return semaphorePubSub.getEntry(getName() + ":" + getCurrentId());
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager());
}
private void unsubscribe(RFuture<RedissonLockEntry> future) {
semaphorePubSub.unsubscribe(future.getNow(), getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager());
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), fairLock.getName(), fairLock.getThreadsQueueName(), fairLock.getTimeoutSetName());
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName());
}
private boolean tryAcquire() {
return get(tryAcquireAsync());
}
private RFuture<Boolean> tryAcquireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local items = redis.call('lrange', KEYS[2], 0, -1) " +
"local found = false; " +
"for i=1,#items do " +
"if items[i] == ARGV[1] then " +
"found = true; " +
"break;" +
"end; " +
"end; "
+ "if found == false then "
+ "redis.call('rpush', KEYS[2], ARGV[1]); "
+ "end; "
+ "local value = redis.call('lindex', KEYS[2], 0); "
+ "local size = redis.call('llen', KEYS[2]); "
+ "if value ~= false and value == ARGV[1] then "
+ "if size > 1 then "
+ "redis.call('lpop', KEYS[2]);"
+ "redis.call('rpush', KEYS[2], value);"
+ "local nextValue = redis.call('lindex', KEYS[2], 0); "
+ "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);"
+ "end; "
+ "return 1;"
+ "end;" +
"return 0;",
Arrays.<Object>asList(getName(), getIdsListName()), getCurrentId());
}
private String getCurrentId() {
String currentId = id + "-" + Thread.currentThread().getId();
usedIds.add(currentId);
return currentId;
}
@Override
public V take() throws InterruptedException {
fairLock.lockInterruptibly();
try {
if (tryAcquire()) {
return super.take();
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire()) {
return super.take();
}
getEntry().getLatch().acquire(1);
}
} finally {
fairLock.unlock();
unsubscribe(future);
}
}
@Override
public RFuture<V> takeAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = fairLock.lockAsync();
lockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
final RFuture<V> takeFuture = takeAsync();
takeFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!takeFuture.isSuccess()) {
promise.tryFailure(takeFuture.cause());
return;
}
promise.trySuccess(takeFuture.getNow());
}
});
}
});
}
});
return promise;
public void destroy() {
commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID_WITH_VALUES,
"for i = 1, #ARGV, 1 do "
+ "redis.call('lrem', KEYS[1], 0, ARGV[i]);"
+"end; ",
Collections.<Object>singletonList(getIdsListName()), usedIds.toArray());
}
// @Override
// public RFuture<V> takeAsync() {
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Void> lockFuture = fairLock.lockAsync();
// lockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// final RFuture<V> takeFuture = takeAsync();
// takeFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!takeFuture.isSuccess()) {
// promise.tryFailure(takeFuture.cause());
// return;
// }
//
// promise.trySuccess(takeFuture.getNow());
// }
// });
// }
// });
// }
// });
//
// return promise;
// return null;
// }
@Override
public V poll() {
if (fairLock.tryLock()) {
try {
return super.poll();
} finally {
fairLock.unlock();
}
if (tryAcquire()) {
return super.poll();
}
return null;
}
@Override
public RFuture<V> pollAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync();
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
promise.trySuccess(null);
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire()) {
return super.poll();
}
getEntry().getLatch().acquireUninterruptibly(1);
}
});
return promise;
} finally {
unsubscribe(future);
}
}
// @Override
// public RFuture<V> pollAsync() {
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync();
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
if (tryAcquire()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
}
return null;
}
@Override
public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
} else {
promise.trySuccess(null);
return null;
}
}
});
return promise;
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
getEntry().getLatch().acquire(1);
}
} finally {
unsubscribe(future);
}
return null;
}
// @Override
// public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
// final long startTime = System.currentTimeMillis();
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// promise.trySuccess(null);
// }
// });
// }
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
// @Override
// public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
// long startTime = System.currentTimeMillis();
// if (fairLock.tryLock(timeout, unit)) {
// try {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
// }
// return null;
// } finally {
// fairLock.unlock();
// }
// }
// return null;
// }
promise.trySuccess(null);
}
});
}
} else {
promise.trySuccess(null);
}
}
});
return promise;
}
// @Override
// public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
// final long startTime = System.currentTimeMillis();
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// promise.trySuccess(null);
// }
// });
// }
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
}

@ -23,6 +23,6 @@ package org.redisson.api;
*
* @param <V> value
*/
public interface RBlockingFairQueue<V> extends RBlockingQueue<V> {
public interface RBlockingFairQueue<V> extends RBlockingQueue<V>, RDestroyable {
}

@ -213,6 +213,7 @@ public interface RedisCommands {
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
RedisCommand<Void> EVAL_VOID_WITH_VALUES = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.OBJECTS);
RedisCommand<Void> EVAL_VOID_WITH_VALUES_6 = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 6, ValueType.OBJECTS);
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>());

@ -19,6 +19,11 @@ import org.redisson.RedissonCountDownLatch;
import org.redisson.RedissonCountDownLatchEntry;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
*/
public class CountDownLatchPubSub extends PublishSubscribe<RedissonCountDownLatchEntry> {
@Override

@ -18,6 +18,11 @@ package org.redisson.pubsub;
import org.redisson.RedissonLockEntry;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
*/
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
public static final Long unlockMessage = 0L;

@ -30,6 +30,12 @@ import org.redisson.misc.RPromise;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
* @param <E>
*/
abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();

@ -18,6 +18,11 @@ package org.redisson.pubsub;
import org.redisson.RedissonLockEntry;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
*/
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override

@ -17,14 +17,17 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
@Test
public void testFairness() throws InterruptedException {
int size = 10000;
RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
int size = 2000;
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
CountDownLatch latch = new CountDownLatch(size);
AtomicInteger t1Counter = new AtomicInteger();
AtomicInteger t2Counter = new AtomicInteger();
AtomicInteger t3Counter = new AtomicInteger();
AtomicInteger t4Counter = new AtomicInteger();
Thread t1 = new Thread("test-thread1") {
public void run() {
RBlockingFairQueue<String> queue = redisson.getBlockingFairQueue("test");
while (true) {
try {
String a = queue.poll(1, TimeUnit.SECONDS);
@ -36,11 +39,13 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
} catch (InterruptedException e) {
}
}
queue.destroy();
};
};
Thread t2 = new Thread("test-thread1") {
Thread t2 = new Thread("test-thread2") {
public void run() {
RBlockingFairQueue<String> queue = redisson.getBlockingFairQueue("test");
while (true) {
try {
String a = queue.poll(1, TimeUnit.SECONDS);
@ -53,22 +58,66 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
} catch (InterruptedException e) {
}
}
queue.destroy();
};
};
RBlockingFairQueue<String> queue34 = redisson.getBlockingFairQueue("test");
Thread t3 = new Thread("test-thread3") {
public void run() {
while (true) {
try {
String a = queue34.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
Thread.sleep(10);
latch.countDown();
t3Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
};
};
Thread t4 = new Thread("test-thread4") {
public void run() {
while (true) {
try {
String a = queue34.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
latch.countDown();
t4Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
};
};
queue34.destroy();
for (int i = 0; i < size; i++) {
queue.add("" + i);
}
t1.start();
t2.start();
t3.start();
t4.start();
t2.join();
t1.join();
t2.join();
t3.join();
t4.join();
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
System.out.println("t1: " + t1Counter.get());
System.out.println("t2: " + t2Counter.get());
System.out.println("t3: " + t3Counter.get());
System.out.println("t4: " + t4Counter.get());
}
}

Loading…
Cancel
Save