diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java index 7d5f08a1d..c82214cf8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java @@ -19,7 +19,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RBlockingFairQueue; import org.redisson.api.RFuture; @@ -28,8 +30,14 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; +import org.redisson.misc.RPromise; import org.redisson.pubsub.SemaphorePubSub; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + /** * * @author Nikita Koksharov @@ -37,6 +45,8 @@ import org.redisson.pubsub.SemaphorePubSub; */ public class RedissonBlockingFairQueue extends RedissonBlockingQueue implements RBlockingFairQueue { + public static final long TIMEOUT_SECONDS = 30; + private final UUID id; private final AtomicInteger instances = new AtomicInteger(); private final SemaphorePubSub semaphorePubSub; @@ -59,6 +69,10 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple return suffixName(getName(), "list"); } + private String getTimeoutName() { + return suffixName(getName(), "timeout"); + } + private String getChannelName() { return suffixName(getName(), getCurrentId() + ":channel"); } @@ -77,39 +91,55 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName(), getTimeoutName()); } - private boolean tryAcquire() { + private Long tryAcquire() { return get(tryAcquireAsync()); } - private RFuture tryAcquireAsync() { - return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local items = redis.call('lrange', KEYS[2], 0, -1) " + - "local found = false; " + - "for i=1,#items do " + - "if items[i] == ARGV[1] then " + - "found = true; " + - "break;" + - "end; " + - "end; " + private RFuture tryAcquireAsync() { + long timeout = System.currentTimeMillis() + TIMEOUT_SECONDS*1000; + + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + + "local timeout = redis.call('get', KEYS[3]);" + + "if timeout ~= false and tonumber(timeout) <= tonumber(ARGV[3]) then " + + "redis.call('lpop', KEYS[2]); " + + "local nextValue = redis.call('lindex', KEYS[2], 0); " + + "if nextValue ~= false and nextValue ~= ARGV[1] then " + + "redis.call('set', KEYS[3], ARGV[2]);" + + "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);" + + "end; " + + "end; " + + + "local items = redis.call('lrange', KEYS[2], 0, -1) " + + "local found = false; " + + "for i=1,#items do " + + "if items[i] == ARGV[1] then " + + "found = true; " + + "break;" + + "end; " + + "end; " + + "if found == false then " + "redis.call('lpush', KEYS[2], ARGV[1]); " + "end; " + + "local value = redis.call('lindex', KEYS[2], 0); " - + "local size = redis.call('llen', KEYS[2]); " - + "if value ~= false and value == ARGV[1] then " + + "if value == ARGV[1] then " + + "redis.call('set', KEYS[3], ARGV[2]);" + + "local size = redis.call('llen', KEYS[2]); " + "if size > 1 then " + "redis.call('lpop', KEYS[2]);" + "redis.call('rpush', KEYS[2], value);" + "local nextValue = redis.call('lindex', KEYS[2], 0); " + "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);" + "end; " - + "return 1;" - + "end;" + - "return 0;", - Arrays.asList(getName(), getIdsListName()), getCurrentId()); + + "return nil;" + + "end;" + + "return tonumber(timeout) - tonumber(ARGV[3]);", + Arrays.asList(getName(), getIdsListName(), getTimeoutName()), getCurrentId(), timeout, System.currentTimeMillis()); } private String getCurrentId() { @@ -119,7 +149,8 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple @Override public V take() throws InterruptedException { - if (tryAcquire()) { + Long currentTimeout = tryAcquire(); + if (currentTimeout == null) { return super.take(); } @@ -127,11 +158,12 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple commandExecutor.syncSubscription(future); try { while (true) { - if (tryAcquire()) { + currentTimeout = tryAcquire(); + if (currentTimeout == null) { return super.take(); } - getEntry().getLatch().acquire(1); + getEntry().getLatch().tryAcquire(currentTimeout, TimeUnit.MILLISECONDS); } } finally { unsubscribe(future); @@ -149,145 +181,424 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple } } -// @Override -// public RFuture takeAsync() { -// final RPromise promise = newPromise(); -// final long threadId = Thread.currentThread().getId(); -// RFuture lockFuture = fairLock.lockAsync(); -// lockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// final RFuture takeFuture = takeAsync(); -// takeFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (!takeFuture.isSuccess()) { -// promise.tryFailure(takeFuture.cause()); -// return; -// } -// -// promise.trySuccess(takeFuture.getNow()); -// } -// }); -// } -// }); -// } -// }); -// -// return promise; -// return null; -// } + @Override + public RFuture takeAsync() { + final RPromise promise = newPromise(); + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + final Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.takeAsync(); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + final RFuture subscribeFuture = subscribe(); + final AtomicReference futureRef = new AtomicReference(); + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + + tryTakeAsync(subscribeFuture, promise); + } + }); + } + } + }); + + return promise; + } @Override public V poll() { - if (tryAcquire()) { + Long currentTimeout = tryAcquire(); + if (currentTimeout == null) { return super.poll(); } + return null; + } + + @Override + public RFuture pollAsync() { + final RPromise promise = newPromise(); + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + final Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + promise.trySuccess(null); + } + } + }); + + return promise; + } + + @Override + public V poll(long timeout, TimeUnit unit) throws InterruptedException { + long startTime = System.currentTimeMillis(); + Long currentTimeout = tryAcquire(); + if (currentTimeout == null) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + return super.poll(remainTime, TimeUnit.MILLISECONDS); + } + return null; + } + RFuture future = subscribe(); - commandExecutor.syncSubscription(future); + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) { + return null; + } + try { while (true) { - if (tryAcquire()) { - return super.poll(); + currentTimeout = tryAcquire(); + if (currentTimeout == null) { + spentTime = System.currentTimeMillis() - startTime; + remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + return super.poll(remainTime, TimeUnit.MILLISECONDS); + } + return null; } - getEntry().getLatch().acquireUninterruptibly(1); + spentTime = System.currentTimeMillis() - startTime; + remainTime = unit.toMillis(timeout) - spentTime; + remainTime = Math.min(remainTime, currentTimeout); + if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) { + return null; + } } } finally { unsubscribe(future); } } -// @Override -// public RFuture pollAsync() { -// final RPromise promise = newPromise(); -// final long threadId = Thread.currentThread().getId(); -// RFuture tryLockFuture = fairLock.tryLockAsync(); -// tryLockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (future.getNow()) { -// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(); -// pollFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (!pollFuture.isSuccess()) { -// promise.tryFailure(pollFuture.cause()); -// return; -// } -// -// promise.trySuccess(pollFuture.getNow()); -// } -// }); -// } -// }); -// } else { -// promise.trySuccess(null); -// } -// } -// }); -// -// return promise; -// return null; -// } + @Override + public RFuture pollAsync(final long timeout, final TimeUnit unit) { + final long startTime = System.currentTimeMillis(); + final RPromise promise = newPromise(); + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + promise.trySuccess(null); + } + } else { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + remainTime = Math.min(remainTime, currentTimeout); + if (remainTime <= 0) { + promise.trySuccess(null); + return; + } + + final RFuture subscribeFuture = subscribe(); + final AtomicReference futureRef = new AtomicReference(); + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + tryPollAsync(startTime, timeout, unit, subscribeFuture, promise); + } + }); + if (!subscribeFuture.isDone()) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (!subscribeFuture.isDone()) { + subscribeFuture.cancel(false); + promise.trySuccess(null); + } + } + }, remainTime, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); + } + } + } + }); + + return promise; + } + + private void tryTakeAsync(final RFuture subscribeFuture, final RPromise promise) { + if (promise.isDone()) { + unsubscribe(subscribeFuture); + return; + } + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + unsubscribe(subscribeFuture); + promise.tryFailure(future.cause()); + return; + } + + Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.takeAsync(); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + unsubscribe(subscribeFuture); + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + final RedissonLockEntry entry = getEntry(); + synchronized (entry) { + if (entry.getLatch().tryAcquire()) { + tryTakeAsync(subscribeFuture, promise); + } else { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + executed.set(true); + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + + tryTakeAsync(subscribeFuture, promise); + } + }; + entry.addListener(listener); + + if (!executed.get()) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout t) throws Exception { + synchronized (entry) { + if (entry.removeListener(listener)) { + tryTakeAsync(subscribeFuture, promise); + } + } + } + }, currentTimeout, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); + } + } + } + } + }; + }); + } + + private void tryPollAsync(final long startTime, final long timeout, final TimeUnit unit, + final RFuture subscribeFuture, final RPromise promise) { + if (promise.isDone()) { + unsubscribe(subscribeFuture); + return; + } + + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime <= 0) { + unsubscribe(subscribeFuture); + promise.trySuccess(null); + return; + } + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + unsubscribe(subscribeFuture); + promise.tryFailure(future.cause()); + return; + } + + Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + unsubscribe(subscribeFuture); + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + unsubscribe(subscribeFuture); + promise.trySuccess(null); + } + } else { + final RedissonLockEntry entry = getEntry(); + synchronized (entry) { + if (entry.getLatch().tryAcquire()) { + tryPollAsync(startTime, timeout, unit, subscribeFuture, promise); + } else { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + executed.set(true); + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + + tryPollAsync(startTime, timeout, unit, subscribeFuture, promise); + } + }; + entry.addListener(listener); + + if (!executed.get()) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout t) throws Exception { + synchronized (entry) { + if (entry.removeListener(listener)) { + tryPollAsync(startTime, timeout, unit, subscribeFuture, promise); + } + } + } + }, remainTime, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); + } + } + } + } + }; + }); + } @Override - public V poll(long timeout, TimeUnit unit) throws InterruptedException { + public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { long startTime = System.currentTimeMillis(); - if (tryAcquire()) { + Long currentTimeout = tryAcquire(); + if (currentTimeout == null) { long spentTime = System.currentTimeMillis() - startTime; long remainTime = unit.toMillis(timeout) - spentTime; if (remainTime > 0) { - return super.poll(remainTime, TimeUnit.MILLISECONDS); + return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); } return null; } RFuture future = subscribe(); - commandExecutor.syncSubscription(future); + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) { + return null; + } + try { while (true) { - if (tryAcquire()) { - long spentTime = System.currentTimeMillis() - startTime; - long remainTime = unit.toMillis(timeout) - spentTime; + currentTimeout = tryAcquire(); + if (currentTimeout == null) { + spentTime = System.currentTimeMillis() - startTime; + remainTime = unit.toMillis(timeout) - spentTime; if (remainTime > 0) { - return super.poll(remainTime, TimeUnit.MILLISECONDS); + return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); } return null; } - long spentTime = System.currentTimeMillis() - startTime; - long remainTime = unit.toMillis(timeout) - spentTime; + spentTime = System.currentTimeMillis() - startTime; + remainTime = unit.toMillis(timeout) - spentTime; + remainTime = Math.min(remainTime, currentTimeout); if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) { return null; } @@ -296,153 +607,175 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple unsubscribe(future); } } - -// @Override -// public RFuture pollAsync(final long timeout, final TimeUnit unit) { -// final long startTime = System.currentTimeMillis(); -// final RPromise promise = newPromise(); -// final long threadId = Thread.currentThread().getId(); -// RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); -// tryLockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (future.getNow()) { -// long spentTime = System.currentTimeMillis() - startTime; -// long remainTime = unit.toMillis(timeout) - spentTime; -// if (remainTime > 0) { -// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); -// pollFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (!pollFuture.isSuccess()) { -// promise.tryFailure(pollFuture.cause()); -// return; -// } -// -// promise.trySuccess(pollFuture.getNow()); -// } -// }); -// } -// }); -// } else { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// promise.trySuccess(null); -// } -// }); -// } -// } else { -// promise.trySuccess(null); -// } -// } -// }); -// -// return promise; -// return null; -// } - -// @Override -// public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { -// long startTime = System.currentTimeMillis(); -// if (fairLock.tryLock(timeout, unit)) { -// try { -// long spentTime = System.currentTimeMillis() - startTime; -// long remainTime = unit.toMillis(timeout) - spentTime; -// if (remainTime > 0) { -// return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); -// } -// return null; -// } finally { -// fairLock.unlock(); -// } -// } -// return null; -// } - -// @Override -// public RFuture pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) { -// final long startTime = System.currentTimeMillis(); -// final RPromise promise = newPromise(); -// final long threadId = Thread.currentThread().getId(); -// RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); -// tryLockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (future.getNow()) { -// long spentTime = System.currentTimeMillis() - startTime; -// long remainTime = unit.toMillis(timeout) - spentTime; -// if (remainTime > 0) { -// final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); -// pollFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// if (!pollFuture.isSuccess()) { -// promise.tryFailure(pollFuture.cause()); -// return; -// } -// -// promise.trySuccess(pollFuture.getNow()); -// } -// }); -// } -// }); -// } else { -// RFuture unlockFuture = fairLock.unlockAsync(threadId); -// unlockFuture.addListener(new FutureListener() { -// @Override -// public void operationComplete(Future future) throws Exception { -// if (!future.isSuccess()) { -// promise.tryFailure(future.cause()); -// return; -// } -// -// promise.trySuccess(null); -// } -// }); -// } -// } else { -// promise.trySuccess(null); -// } -// } -// }); -// -// return promise; -// return null; -// } + @Override + public RFuture pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) { + final long startTime = System.currentTimeMillis(); + final RPromise promise = newPromise(); + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + promise.trySuccess(null); + } + } else { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + remainTime = Math.min(remainTime, currentTimeout); + if (remainTime <= 0) { + promise.trySuccess(null); + return; + } + + final RFuture subscribeFuture = subscribe(); + final AtomicReference futureRef = new AtomicReference(); + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + + tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName); + } + }); + if (!subscribeFuture.isDone()) { + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (!subscribeFuture.isDone()) { + subscribeFuture.cancel(false); + promise.trySuccess(null); + } + } + }, remainTime, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); + } + } + } + }); + + return promise; + } + + private void tryPollLastAndOfferFirstToAsync(final long startTime, final long timeout, final TimeUnit unit, + final RFuture subscribeFuture, final RPromise promise, final String queueName) { + if (promise.isDone()) { + unsubscribe(subscribeFuture); + return; + } + + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime <= 0) { + unsubscribe(subscribeFuture); + promise.trySuccess(null); + return; + } + + RFuture tryAcquireFuture = tryAcquireAsync(); + tryAcquireFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + unsubscribe(subscribeFuture); + promise.tryFailure(future.cause()); + return; + } + + Long currentTimeout = future.getNow(); + if (currentTimeout == null) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + unsubscribe(subscribeFuture); + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + }); + } else { + unsubscribe(subscribeFuture); + promise.trySuccess(null); + } + } else { + final RedissonLockEntry entry = getEntry(); + synchronized (entry) { + if (entry.getLatch().tryAcquire()) { + tryPollAsync(startTime, timeout, unit, subscribeFuture, promise); + } else { + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicReference futureRef = new AtomicReference(); + final Runnable listener = new Runnable() { + @Override + public void run() { + executed.set(true); + if (futureRef.get() != null) { + futureRef.get().cancel(); + } + + tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName); + } + }; + entry.addListener(listener); + + if (!executed.get()) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout t) throws Exception { + synchronized (entry) { + if (entry.removeListener(listener)) { + tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName); + } + } + } + }, remainTime, TimeUnit.MILLISECONDS); + futureRef.set(scheduledFuture); + } + } + } + } + }; + }); + } + + } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java index 740379338..ff07e03e7 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java @@ -14,11 +14,101 @@ import org.redisson.api.RedissonClient; public class RedissonBlockingFairQueueTest extends BaseTest { @Test - public void testFairness() throws InterruptedException { + public void testTimeout() throws InterruptedException { int size = 1000; + CountDownLatch latch = new CountDownLatch(size); + AtomicInteger t1Counter = new AtomicInteger(); + AtomicInteger t2Counter = new AtomicInteger(); + AtomicInteger t3Counter = new AtomicInteger(); + RedissonClient redisson1 = createInstance(); + RBlockingFairQueue queue1 = redisson1.getBlockingFairQueue("test"); + Thread t1 = new Thread("test-thread1") { + public void run() { + while (true) { + try { + String a = queue1.poll(5, TimeUnit.SECONDS); + if (latch.getCount() == 0) { + break; + } + if (a == null) { + continue; + } + latch.countDown(); + t1Counter.incrementAndGet(); + } catch (InterruptedException e) { + } + } + }; + }; + + RedissonClient redisson2 = createInstance(); + RBlockingFairQueue queue2 = redisson2.getBlockingFairQueue("test"); + Thread t2 = new Thread("test-thread2") { + public void run() { + try { + String a = queue2.poll(2, TimeUnit.SECONDS); + if (a != null) { + latch.countDown(); + t2Counter.incrementAndGet(); + } + } catch (InterruptedException e) { + } + }; + }; + + RedissonClient redisson3 = createInstance(); + RBlockingFairQueue queue3 = redisson3.getBlockingFairQueue("test"); + Thread t3 = new Thread("test-thread3") { + public void run() { + while (true) { + try { + String a = queue3.poll(5, TimeUnit.SECONDS); + if (latch.getCount() == 0) { + break; + } + if (a == null) { + continue; + } + latch.countDown(); + t3Counter.incrementAndGet(); + } catch (InterruptedException e) { + } + } + }; + }; + + t1.start(); + t1.join(500); + t2.start(); + t2.join(500); + t3.start(); + t3.join(500); + RBlockingQueue queue = redisson.getBlockingFairQueue("test"); + assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(3); + for (int i = 0; i < size; i++) { + queue.add("" + i); + } + + t1.join(); + t2.join(); + t3.join(); + + assertThat(latch.await(50, TimeUnit.SECONDS)).isTrue(); + + assertThat(t1Counter.get()).isBetween(499, 500); + assertThat(t2Counter.get()).isEqualTo(1); + assertThat(t3Counter.get()).isBetween(499, 500); + + assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(2); + } + + @Test + public void testFairness() throws InterruptedException { + int size = 1000; + CountDownLatch latch = new CountDownLatch(size); AtomicInteger t1Counter = new AtomicInteger(); AtomicInteger t2Counter = new AtomicInteger(); @@ -99,6 +189,7 @@ public class RedissonBlockingFairQueueTest extends BaseTest { }; }; + RBlockingQueue queue = redisson.getBlockingFairQueue("test"); for (int i = 0; i < size; i++) { queue.add("" + i); } @@ -128,11 +219,7 @@ public class RedissonBlockingFairQueueTest extends BaseTest { assertThat(t2Counter.get()).isEqualTo(250); assertThat(t3Counter.get()).isEqualTo(250); assertThat(t4Counter.get()).isEqualTo(250); - - System.out.println("t1: " + t1Counter.get()); - System.out.println("t2: " + t2Counter.get()); - System.out.println("t3: " + t3Counter.get()); - System.out.println("t4: " + t4Counter.get()); + assertThat(redisson.getKeys().count()).isEqualTo(1); } }