From cad190152f209c757aaac4ab887fce1b89649f6b Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 16 Dec 2016 15:24:37 +0300 Subject: [PATCH] new POC for RedissonBlockingFairQueue --- .../src/main/java/org/redisson/Redisson.java | 4 +- .../redisson/RedissonBlockingFairQueue.java | 611 +++++++++++------- .../org/redisson/api/RBlockingFairQueue.java | 2 +- .../client/protocol/RedisCommands.java | 1 + .../redisson/pubsub/CountDownLatchPubSub.java | 5 + .../java/org/redisson/pubsub/LockPubSub.java | 5 + .../org/redisson/pubsub/PublishSubscribe.java | 6 + .../org/redisson/pubsub/SemaphorePubSub.java | 5 + .../RedissonBlockingFairQueueTest.java | 57 +- 9 files changed, 440 insertions(+), 256 deletions(-) diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index d2cb1de3e..28a4d5d6b 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -428,12 +428,12 @@ public class Redisson implements RedissonClient { @Override public RBlockingFairQueue getBlockingFairQueue(String name) { - return new RedissonBlockingFairQueue(commandExecutor, name, id); + return new RedissonBlockingFairQueue(commandExecutor, name, semaphorePubSub, id); } @Override public RBlockingFairQueue getBlockingFairQueue(String name, Codec codec) { - return new RedissonBlockingFairQueue(codec, commandExecutor, name, id); + return new RedissonBlockingFairQueue(codec, commandExecutor, name, semaphorePubSub, id); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java index 52dcc4789..322eff42c 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java @@ -15,18 +15,23 @@ */ package org.redisson; +import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.redisson.api.RBlockingFairQueue; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; -import org.redisson.misc.RPromise; +import org.redisson.pubsub.SemaphorePubSub; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; /** * @@ -35,298 +40,406 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonBlockingFairQueue extends RedissonBlockingQueue implements RBlockingFairQueue { - private final RedissonFairLock fairLock; + private final Set usedIds = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); + private final UUID id; + private final SemaphorePubSub semaphorePubSub; - protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, UUID id) { + protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) { super(commandExecutor, name); - String lockName = prefixName("redisson_bfq_lock", name); - fairLock = new RedissonFairLock(commandExecutor, lockName, id); + this.semaphorePubSub = semaphorePubSub; + this.id = id; } - protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, UUID id) { + protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) { super(codec, commandExecutor, name); - String lockName = prefixName("redisson_bfq_lock", name); - fairLock = new RedissonFairLock(commandExecutor, lockName, id); + this.semaphorePubSub = semaphorePubSub; + this.id = id; + } + + private String getIdsListName() { + return "{" + getName() + "}:list"; + } + + private String getChannelName() { + return "{" + getName() + "}:" + getCurrentId() + ":channel"; + } + + private RedissonLockEntry getEntry() { + return semaphorePubSub.getEntry(getName() + ":" + getCurrentId()); + } + + private RFuture subscribe() { + return semaphorePubSub.subscribe(getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager()); + } + + private void unsubscribe(RFuture future) { + semaphorePubSub.unsubscribe(future.getNow(), getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager()); } @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), fairLock.getName(), fairLock.getThreadsQueueName(), fairLock.getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName()); + } + + private boolean tryAcquire() { + return get(tryAcquireAsync()); + } + + private RFuture tryAcquireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local items = redis.call('lrange', KEYS[2], 0, -1) " + + "local found = false; " + + "for i=1,#items do " + + "if items[i] == ARGV[1] then " + + "found = true; " + + "break;" + + "end; " + + "end; " + + "if found == false then " + + "redis.call('rpush', KEYS[2], ARGV[1]); " + + "end; " + + "local value = redis.call('lindex', KEYS[2], 0); " + + "local size = redis.call('llen', KEYS[2]); " + + "if value ~= false and value == ARGV[1] then " + + "if size > 1 then " + + "redis.call('lpop', KEYS[2]);" + + "redis.call('rpush', KEYS[2], value);" + + "local nextValue = redis.call('lindex', KEYS[2], 0); " + + "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);" + + "end; " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(getName(), getIdsListName()), getCurrentId()); + } + + private String getCurrentId() { + String currentId = id + "-" + Thread.currentThread().getId(); + usedIds.add(currentId); + return currentId; } + @Override public V take() throws InterruptedException { - fairLock.lockInterruptibly(); - try { + if (tryAcquire()) { return super.take(); + } + + RFuture future = subscribe(); + commandExecutor.syncSubscription(future); + try { + while (true) { + if (tryAcquire()) { + return super.take(); + } + + getEntry().getLatch().acquire(1); + } } finally { - fairLock.unlock(); + unsubscribe(future); } } @Override - public RFuture takeAsync() { - final RPromise promise = newPromise(); - final long threadId = Thread.currentThread().getId(); - RFuture lockFuture = fairLock.lockAsync(); - lockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - final RFuture takeFuture = takeAsync(); - takeFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - if (!takeFuture.isSuccess()) { - promise.tryFailure(takeFuture.cause()); - return; - } - - promise.trySuccess(takeFuture.getNow()); - } - }); - } - }); - } - }); - - return promise; + public void destroy() { + commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID_WITH_VALUES, + "for i = 1, #ARGV, 1 do " + + "redis.call('lrem', KEYS[1], 0, ARGV[i]);" + +"end; ", + Collections.singletonList(getIdsListName()), usedIds.toArray()); } +// @Override +// public RFuture takeAsync() { +// final RPromise promise = newPromise(); +// final long threadId = Thread.currentThread().getId(); +// RFuture lockFuture = fairLock.lockAsync(); +// lockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// final RFuture takeFuture = takeAsync(); +// takeFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (!takeFuture.isSuccess()) { +// promise.tryFailure(takeFuture.cause()); +// return; +// } +// +// promise.trySuccess(takeFuture.getNow()); +// } +// }); +// } +// }); +// } +// }); +// +// return promise; +// return null; +// } + @Override public V poll() { - if (fairLock.tryLock()) { - try { - return super.poll(); - } finally { - fairLock.unlock(); - } + if (tryAcquire()) { + return super.poll(); } - return null; - } - - @Override - public RFuture pollAsync() { - final RPromise promise = newPromise(); - final long threadId = Thread.currentThread().getId(); - RFuture tryLockFuture = fairLock.tryLockAsync(); - tryLockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - if (future.getNow()) { - final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(); - pollFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - if (!pollFuture.isSuccess()) { - promise.tryFailure(pollFuture.cause()); - return; - } - - promise.trySuccess(pollFuture.getNow()); - } - }); - } - }); - } else { - promise.trySuccess(null); + RFuture future = subscribe(); + commandExecutor.syncSubscription(future); + try { + while (true) { + if (tryAcquire()) { + return super.poll(); } + + getEntry().getLatch().acquireUninterruptibly(1); } - }); - - return promise; + } finally { + unsubscribe(future); + } } + +// @Override +// public RFuture pollAsync() { +// final RPromise promise = newPromise(); +// final long threadId = Thread.currentThread().getId(); +// RFuture tryLockFuture = fairLock.tryLockAsync(); +// tryLockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (future.getNow()) { +// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(); +// pollFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (!pollFuture.isSuccess()) { +// promise.tryFailure(pollFuture.cause()); +// return; +// } +// +// promise.trySuccess(pollFuture.getNow()); +// } +// }); +// } +// }); +// } else { +// promise.trySuccess(null); +// } +// } +// }); +// +// return promise; +// return null; +// } @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { long startTime = System.currentTimeMillis(); - if (fairLock.tryLock(timeout, unit)) { - try { - long spentTime = System.currentTimeMillis() - startTime; - long remainTime = unit.toMillis(timeout) - spentTime; - if (remainTime > 0) { - return super.poll(remainTime, TimeUnit.MILLISECONDS); - } - return null; - } finally { - fairLock.unlock(); + if (tryAcquire()) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + return super.poll(remainTime, TimeUnit.MILLISECONDS); } + return null; } - return null; - } - - @Override - public RFuture pollAsync(final long timeout, final TimeUnit unit) { - final long startTime = System.currentTimeMillis(); - final RPromise promise = newPromise(); - final long threadId = Thread.currentThread().getId(); - RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); - tryLockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - if (future.getNow()) { + RFuture future = subscribe(); + commandExecutor.syncSubscription(future); + try { + while (true) { + if (tryAcquire()) { long spentTime = System.currentTimeMillis() - startTime; long remainTime = unit.toMillis(timeout) - spentTime; if (remainTime > 0) { - final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); - pollFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - if (!pollFuture.isSuccess()) { - promise.tryFailure(pollFuture.cause()); - return; - } - - promise.trySuccess(pollFuture.getNow()); - } - }); - } - }); - } else { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - promise.trySuccess(null); - } - }); + return super.poll(remainTime, TimeUnit.MILLISECONDS); } - } else { - promise.trySuccess(null); + return null; } - } - }); - - return promise; - } - @Override - public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { - long startTime = System.currentTimeMillis(); - if (fairLock.tryLock(timeout, unit)) { - try { - long spentTime = System.currentTimeMillis() - startTime; - long remainTime = unit.toMillis(timeout) - spentTime; - if (remainTime > 0) { - return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); - } - return null; - } finally { - fairLock.unlock(); + getEntry().getLatch().acquire(1); } + } finally { + unsubscribe(future); } - return null; } + +// @Override +// public RFuture pollAsync(final long timeout, final TimeUnit unit) { +// final long startTime = System.currentTimeMillis(); +// final RPromise promise = newPromise(); +// final long threadId = Thread.currentThread().getId(); +// RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); +// tryLockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (future.getNow()) { +// long spentTime = System.currentTimeMillis() - startTime; +// long remainTime = unit.toMillis(timeout) - spentTime; +// if (remainTime > 0) { +// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); +// pollFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (!pollFuture.isSuccess()) { +// promise.tryFailure(pollFuture.cause()); +// return; +// } +// +// promise.trySuccess(pollFuture.getNow()); +// } +// }); +// } +// }); +// } else { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// promise.trySuccess(null); +// } +// }); +// } +// } else { +// promise.trySuccess(null); +// } +// } +// }); +// +// return promise; +// return null; +// } - @Override - public RFuture pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) { - final long startTime = System.currentTimeMillis(); - final RPromise promise = newPromise(); - final long threadId = Thread.currentThread().getId(); - RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); - tryLockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - if (future.getNow()) { - long spentTime = System.currentTimeMillis() - startTime; - long remainTime = unit.toMillis(timeout) - spentTime; - if (remainTime > 0) { - final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); - pollFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - if (!pollFuture.isSuccess()) { - promise.tryFailure(pollFuture.cause()); - return; - } - - promise.trySuccess(pollFuture.getNow()); - } - }); - } - }); - } else { - RFuture unlockFuture = fairLock.unlockAsync(threadId); - unlockFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } +// @Override +// public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { +// long startTime = System.currentTimeMillis(); +// if (fairLock.tryLock(timeout, unit)) { +// try { +// long spentTime = System.currentTimeMillis() - startTime; +// long remainTime = unit.toMillis(timeout) - spentTime; +// if (remainTime > 0) { +// return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); +// } +// return null; +// } finally { +// fairLock.unlock(); +// } +// } +// return null; +// } - promise.trySuccess(null); - } - }); - } - } else { - promise.trySuccess(null); - } - } - }); - - return promise; - } +// @Override +// public RFuture pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) { +// final long startTime = System.currentTimeMillis(); +// final RPromise promise = newPromise(); +// final long threadId = Thread.currentThread().getId(); +// RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); +// tryLockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (future.getNow()) { +// long spentTime = System.currentTimeMillis() - startTime; +// long remainTime = unit.toMillis(timeout) - spentTime; +// if (remainTime > 0) { +// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); +// pollFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// if (!pollFuture.isSuccess()) { +// promise.tryFailure(pollFuture.cause()); +// return; +// } +// +// promise.trySuccess(pollFuture.getNow()); +// } +// }); +// } +// }); +// } else { +// RFuture unlockFuture = fairLock.unlockAsync(threadId); +// unlockFuture.addListener(new FutureListener() { +// @Override +// public void operationComplete(Future future) throws Exception { +// if (!future.isSuccess()) { +// promise.tryFailure(future.cause()); +// return; +// } +// +// promise.trySuccess(null); +// } +// }); +// } +// } else { +// promise.trySuccess(null); +// } +// } +// }); +// +// return promise; +// return null; +// } } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java index c14c62203..f61fa3a38 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java @@ -23,6 +23,6 @@ package org.redisson.api; * * @param value */ -public interface RBlockingFairQueue extends RBlockingQueue { +public interface RBlockingFairQueue extends RBlockingQueue, RDestroyable { } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 2c02dc0a6..376622d8d 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -213,6 +213,7 @@ public interface RedisCommands { RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL", new IntegerReplayConvertor()); RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); + RedisCommand EVAL_VOID_WITH_VALUES = new RedisCommand("EVAL", new VoidReplayConvertor(), 4, ValueType.OBJECTS); RedisCommand EVAL_VOID_WITH_VALUES_6 = new RedisCommand("EVAL", new VoidReplayConvertor(), 6, ValueType.OBJECTS); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); diff --git a/redisson/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java b/redisson/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java index 11ab3dd9a..f054c0f5d 100644 --- a/redisson/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java @@ -19,6 +19,11 @@ import org.redisson.RedissonCountDownLatch; import org.redisson.RedissonCountDownLatchEntry; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + */ public class CountDownLatchPubSub extends PublishSubscribe { @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index e7cdfb6ba..f765570f9 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -18,6 +18,11 @@ package org.redisson.pubsub; import org.redisson.RedissonLockEntry; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + */ public class LockPubSub extends PublishSubscribe { public static final Long unlockMessage = 0L; diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 801902d46..d15bec4c0 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -30,6 +30,12 @@ import org.redisson.misc.RPromise; import io.netty.util.internal.PlatformDependent; +/** + * + * @author Nikita Koksharov + * + * @param + */ abstract class PublishSubscribe> { private final ConcurrentMap entries = PlatformDependent.newConcurrentHashMap(); diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index 85a846b6a..0d3c05490 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -18,6 +18,11 @@ package org.redisson.pubsub; import org.redisson.RedissonLockEntry; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + */ public class SemaphorePubSub extends PublishSubscribe { @Override diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java index 4caa89567..a00e00154 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java @@ -17,14 +17,17 @@ public class RedissonBlockingFairQueueTest extends BaseTest { @Test public void testFairness() throws InterruptedException { - int size = 10000; - RBlockingQueue queue = redisson.getBlockingQueue("test"); + int size = 2000; + RBlockingQueue queue = redisson.getBlockingFairQueue("test"); CountDownLatch latch = new CountDownLatch(size); AtomicInteger t1Counter = new AtomicInteger(); AtomicInteger t2Counter = new AtomicInteger(); + AtomicInteger t3Counter = new AtomicInteger(); + AtomicInteger t4Counter = new AtomicInteger(); Thread t1 = new Thread("test-thread1") { public void run() { + RBlockingFairQueue queue = redisson.getBlockingFairQueue("test"); while (true) { try { String a = queue.poll(1, TimeUnit.SECONDS); @@ -36,11 +39,13 @@ public class RedissonBlockingFairQueueTest extends BaseTest { } catch (InterruptedException e) { } } + queue.destroy(); }; }; - Thread t2 = new Thread("test-thread1") { + Thread t2 = new Thread("test-thread2") { public void run() { + RBlockingFairQueue queue = redisson.getBlockingFairQueue("test"); while (true) { try { String a = queue.poll(1, TimeUnit.SECONDS); @@ -53,22 +58,66 @@ public class RedissonBlockingFairQueueTest extends BaseTest { } catch (InterruptedException e) { } } + queue.destroy(); }; }; + RBlockingFairQueue queue34 = redisson.getBlockingFairQueue("test"); + Thread t3 = new Thread("test-thread3") { + public void run() { + while (true) { + try { + String a = queue34.poll(1, TimeUnit.SECONDS); + if (a == null) { + break; + } + Thread.sleep(10); + latch.countDown(); + t3Counter.incrementAndGet(); + } catch (InterruptedException e) { + } + } + }; + }; + + Thread t4 = new Thread("test-thread4") { + public void run() { + while (true) { + try { + String a = queue34.poll(1, TimeUnit.SECONDS); + if (a == null) { + break; + } + latch.countDown(); + t4Counter.incrementAndGet(); + } catch (InterruptedException e) { + } + } + }; + }; + + queue34.destroy(); + + for (int i = 0; i < size; i++) { queue.add("" + i); } t1.start(); t2.start(); + t3.start(); + t4.start(); - t2.join(); t1.join(); + t2.join(); + t3.join(); + t4.join(); assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); System.out.println("t1: " + t1Counter.get()); System.out.println("t2: " + t2Counter.get()); + System.out.println("t3: " + t3Counter.get()); + System.out.println("t4: " + t4Counter.get()); } }