Client Id auto expiration for RedissonBlockingFairQueue implemented

pull/748/head
Nikita 8 years ago
parent 923498fff0
commit 90dd916785

@ -19,7 +19,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RFuture;
@ -28,8 +30,14 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
@ -37,6 +45,8 @@ import org.redisson.pubsub.SemaphorePubSub;
*/
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {
public static final long TIMEOUT_SECONDS = 30;
private final UUID id;
private final AtomicInteger instances = new AtomicInteger();
private final SemaphorePubSub semaphorePubSub;
@ -59,6 +69,10 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
return suffixName(getName(), "list");
}
private String getTimeoutName() {
return suffixName(getName(), "timeout");
}
private String getChannelName() {
return suffixName(getName(), getCurrentId() + ":channel");
}
@ -77,39 +91,55 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName());
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName(), getTimeoutName());
}
private boolean tryAcquire() {
private Long tryAcquire() {
return get(tryAcquireAsync());
}
private RFuture<Boolean> tryAcquireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local items = redis.call('lrange', KEYS[2], 0, -1) " +
"local found = false; " +
"for i=1,#items do " +
"if items[i] == ARGV[1] then " +
"found = true; " +
"break;" +
"end; " +
"end; "
private RFuture<Long> tryAcquireAsync() {
long timeout = System.currentTimeMillis() + TIMEOUT_SECONDS*1000;
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local timeout = redis.call('get', KEYS[3]);"
+ "if timeout ~= false and tonumber(timeout) <= tonumber(ARGV[3]) then "
+ "redis.call('lpop', KEYS[2]); "
+ "local nextValue = redis.call('lindex', KEYS[2], 0); "
+ "if nextValue ~= false and nextValue ~= ARGV[1] then "
+ "redis.call('set', KEYS[3], ARGV[2]);"
+ "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);"
+ "end; "
+ "end; "
+ "local items = redis.call('lrange', KEYS[2], 0, -1) "
+ "local found = false; "
+ "for i=1,#items do "
+ "if items[i] == ARGV[1] then "
+ "found = true; "
+ "break;"
+ "end; "
+ "end; "
+ "if found == false then "
+ "redis.call('lpush', KEYS[2], ARGV[1]); "
+ "end; "
+ "local value = redis.call('lindex', KEYS[2], 0); "
+ "local size = redis.call('llen', KEYS[2]); "
+ "if value ~= false and value == ARGV[1] then "
+ "if value == ARGV[1] then "
+ "redis.call('set', KEYS[3], ARGV[2]);"
+ "local size = redis.call('llen', KEYS[2]); "
+ "if size > 1 then "
+ "redis.call('lpop', KEYS[2]);"
+ "redis.call('rpush', KEYS[2], value);"
+ "local nextValue = redis.call('lindex', KEYS[2], 0); "
+ "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);"
+ "end; "
+ "return 1;"
+ "end;" +
"return 0;",
Arrays.<Object>asList(getName(), getIdsListName()), getCurrentId());
+ "return nil;"
+ "end;"
+ "return tonumber(timeout) - tonumber(ARGV[3]);",
Arrays.<Object>asList(getName(), getIdsListName(), getTimeoutName()), getCurrentId(), timeout, System.currentTimeMillis());
}
private String getCurrentId() {
@ -119,7 +149,8 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
@Override
public V take() throws InterruptedException {
if (tryAcquire()) {
Long currentTimeout = tryAcquire();
if (currentTimeout == null) {
return super.take();
}
@ -127,11 +158,12 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire()) {
currentTimeout = tryAcquire();
if (currentTimeout == null) {
return super.take();
}
getEntry().getLatch().acquire(1);
getEntry().getLatch().tryAcquire(currentTimeout, TimeUnit.MILLISECONDS);
}
} finally {
unsubscribe(future);
@ -149,145 +181,424 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
}
}
// @Override
// public RFuture<V> takeAsync() {
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Void> lockFuture = fairLock.lockAsync();
// lockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// final RFuture<V> takeFuture = takeAsync();
// takeFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!takeFuture.isSuccess()) {
// promise.tryFailure(takeFuture.cause());
// return;
// }
//
// promise.trySuccess(takeFuture.getNow());
// }
// });
// }
// });
// }
// });
//
// return promise;
// return null;
// }
@Override
public RFuture<V> takeAsync() {
final RPromise<V> promise = newPromise();
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
final Long currentTimeout = future.getNow();
if (currentTimeout == null) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.takeAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryTakeAsync(subscribeFuture, promise);
}
});
}
}
});
return promise;
}
@Override
public V poll() {
if (tryAcquire()) {
Long currentTimeout = tryAcquire();
if (currentTimeout == null) {
return super.poll();
}
return null;
}
@Override
public RFuture<V> pollAsync() {
final RPromise<V> promise = newPromise();
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
final Long currentTimeout = future.getNow();
if (currentTimeout == null) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
Long currentTimeout = tryAcquire();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) {
return null;
}
try {
while (true) {
if (tryAcquire()) {
return super.poll();
currentTimeout = tryAcquire();
if (currentTimeout == null) {
spentTime = System.currentTimeMillis() - startTime;
remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
}
getEntry().getLatch().acquireUninterruptibly(1);
spentTime = System.currentTimeMillis() - startTime;
remainTime = unit.toMillis(timeout) - spentTime;
remainTime = Math.min(remainTime, currentTimeout);
if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) {
return null;
}
}
} finally {
unsubscribe(future);
}
}
// @Override
// public RFuture<V> pollAsync() {
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync();
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
@Override
public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Long currentTimeout = future.getNow();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
promise.trySuccess(null);
}
} else {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
remainTime = Math.min(remainTime, currentTimeout);
if (remainTime <= 0) {
promise.trySuccess(null);
return;
}
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
}
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
promise.trySuccess(null);
}
}
}, remainTime, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
}
});
return promise;
}
private void tryTakeAsync(final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise) {
if (promise.isDone()) {
unsubscribe(subscribeFuture);
return;
}
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
promise.tryFailure(future.cause());
return;
}
Long currentTimeout = future.getNow();
if (currentTimeout == null) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.takeAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
unsubscribe(subscribeFuture);
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) {
tryTakeAsync(subscribeFuture, promise);
} else {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryTakeAsync(subscribeFuture, promise);
}
};
entry.addListener(listener);
if (!executed.get()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
tryTakeAsync(subscribeFuture, promise);
}
}
}
}, currentTimeout, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
}
}
};
});
}
private void tryPollAsync(final long startTime, final long timeout, final TimeUnit unit,
final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise) {
if (promise.isDone()) {
unsubscribe(subscribeFuture);
return;
}
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime <= 0) {
unsubscribe(subscribeFuture);
promise.trySuccess(null);
return;
}
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
promise.tryFailure(future.cause());
return;
}
Long currentTimeout = future.getNow();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
unsubscribe(subscribeFuture);
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
unsubscribe(subscribeFuture);
promise.trySuccess(null);
}
} else {
final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) {
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
} else {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
}
};
entry.addListener(listener);
if (!executed.get()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
}
}
}
}, remainTime, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
}
}
};
});
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (tryAcquire()) {
Long currentTimeout = tryAcquire();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
}
return null;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) {
return null;
}
try {
while (true) {
if (tryAcquire()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
currentTimeout = tryAcquire();
if (currentTimeout == null) {
spentTime = System.currentTimeMillis() - startTime;
remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
}
return null;
}
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
spentTime = System.currentTimeMillis() - startTime;
remainTime = unit.toMillis(timeout) - spentTime;
remainTime = Math.min(remainTime, currentTimeout);
if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) {
return null;
}
@ -296,153 +607,175 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
unsubscribe(future);
}
}
// @Override
// public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
// final long startTime = System.currentTimeMillis();
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// promise.trySuccess(null);
// }
// });
// }
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
// @Override
// public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
// long startTime = System.currentTimeMillis();
// if (fairLock.tryLock(timeout, unit)) {
// try {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
// }
// return null;
// } finally {
// fairLock.unlock();
// }
// }
// return null;
// }
// @Override
// public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
// final long startTime = System.currentTimeMillis();
// final RPromise<V> promise = newPromise();
// final long threadId = Thread.currentThread().getId();
// RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
// tryLockFuture.addListener(new FutureListener<Boolean>() {
// @Override
// public void operationComplete(Future<Boolean> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (future.getNow()) {
// long spentTime = System.currentTimeMillis() - startTime;
// long remainTime = unit.toMillis(timeout) - spentTime;
// if (remainTime > 0) {
// final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
// pollFuture.addListener(new FutureListener<V>() {
// @Override
// public void operationComplete(Future<V> future) throws Exception {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// if (!pollFuture.isSuccess()) {
// promise.tryFailure(pollFuture.cause());
// return;
// }
//
// promise.trySuccess(pollFuture.getNow());
// }
// });
// }
// });
// } else {
// RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
// unlockFuture.addListener(new FutureListener<Void>() {
// @Override
// public void operationComplete(Future<Void> future) throws Exception {
// if (!future.isSuccess()) {
// promise.tryFailure(future.cause());
// return;
// }
//
// promise.trySuccess(null);
// }
// });
// }
// } else {
// promise.trySuccess(null);
// }
// }
// });
//
// return promise;
// return null;
// }
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Long currentTimeout = future.getNow();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
promise.trySuccess(null);
}
} else {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
remainTime = Math.min(remainTime, currentTimeout);
if (remainTime <= 0) {
promise.trySuccess(null);
return;
}
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
}
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
promise.trySuccess(null);
}
}
}, remainTime, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
}
});
return promise;
}
private void tryPollLastAndOfferFirstToAsync(final long startTime, final long timeout, final TimeUnit unit,
final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise, final String queueName) {
if (promise.isDone()) {
unsubscribe(subscribeFuture);
return;
}
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime <= 0) {
unsubscribe(subscribeFuture);
promise.trySuccess(null);
return;
}
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
tryAcquireFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
promise.tryFailure(future.cause());
return;
}
Long currentTimeout = future.getNow();
if (currentTimeout == null) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
unsubscribe(subscribeFuture);
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
} else {
unsubscribe(subscribeFuture);
promise.trySuccess(null);
}
} else {
final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) {
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
} else {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel();
}
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
}
};
entry.addListener(listener);
if (!executed.get()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
}
}
}
}, remainTime, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
}
}
};
});
}
}

@ -14,11 +14,101 @@ import org.redisson.api.RedissonClient;
public class RedissonBlockingFairQueueTest extends BaseTest {
@Test
public void testFairness() throws InterruptedException {
public void testTimeout() throws InterruptedException {
int size = 1000;
CountDownLatch latch = new CountDownLatch(size);
AtomicInteger t1Counter = new AtomicInteger();
AtomicInteger t2Counter = new AtomicInteger();
AtomicInteger t3Counter = new AtomicInteger();
RedissonClient redisson1 = createInstance();
RBlockingFairQueue<String> queue1 = redisson1.getBlockingFairQueue("test");
Thread t1 = new Thread("test-thread1") {
public void run() {
while (true) {
try {
String a = queue1.poll(5, TimeUnit.SECONDS);
if (latch.getCount() == 0) {
break;
}
if (a == null) {
continue;
}
latch.countDown();
t1Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
};
};
RedissonClient redisson2 = createInstance();
RBlockingFairQueue<String> queue2 = redisson2.getBlockingFairQueue("test");
Thread t2 = new Thread("test-thread2") {
public void run() {
try {
String a = queue2.poll(2, TimeUnit.SECONDS);
if (a != null) {
latch.countDown();
t2Counter.incrementAndGet();
}
} catch (InterruptedException e) {
}
};
};
RedissonClient redisson3 = createInstance();
RBlockingFairQueue<String> queue3 = redisson3.getBlockingFairQueue("test");
Thread t3 = new Thread("test-thread3") {
public void run() {
while (true) {
try {
String a = queue3.poll(5, TimeUnit.SECONDS);
if (latch.getCount() == 0) {
break;
}
if (a == null) {
continue;
}
latch.countDown();
t3Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
};
};
t1.start();
t1.join(500);
t2.start();
t2.join(500);
t3.start();
t3.join(500);
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(3);
for (int i = 0; i < size; i++) {
queue.add("" + i);
}
t1.join();
t2.join();
t3.join();
assertThat(latch.await(50, TimeUnit.SECONDS)).isTrue();
assertThat(t1Counter.get()).isBetween(499, 500);
assertThat(t2Counter.get()).isEqualTo(1);
assertThat(t3Counter.get()).isBetween(499, 500);
assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(2);
}
@Test
public void testFairness() throws InterruptedException {
int size = 1000;
CountDownLatch latch = new CountDownLatch(size);
AtomicInteger t1Counter = new AtomicInteger();
AtomicInteger t2Counter = new AtomicInteger();
@ -99,6 +189,7 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
};
};
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
for (int i = 0; i < size; i++) {
queue.add("" + i);
}
@ -128,11 +219,7 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
assertThat(t2Counter.get()).isEqualTo(250);
assertThat(t3Counter.get()).isEqualTo(250);
assertThat(t4Counter.get()).isEqualTo(250);
System.out.println("t1: " + t1Counter.get());
System.out.println("t2: " + t2Counter.get());
System.out.println("t3: " + t3Counter.get());
System.out.println("t4: " + t4Counter.get());
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
}

Loading…
Cancel
Save