BlockingFairQueue implementation #695

pull/709/head
Nikita 8 years ago
parent 452cdf364c
commit 2b8d524b14

@ -29,6 +29,7 @@ import org.redisson.api.RBatch;
import org.redisson.api.RBinaryStream;
import org.redisson.api.RBitSet;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RBoundedBlockingQueue;
@ -92,6 +93,7 @@ public class Redisson implements RedissonClient {
RedissonReference.warmUp();
}
protected final QueueTransferService queueTransferService = new QueueTransferService();
protected final EvictionScheduler evictionScheduler;
protected final CommandExecutor commandExecutor;
protected final ConnectionManager connectionManager;
@ -423,6 +425,16 @@ public class Redisson implements RedissonClient {
return new RedissonPatternTopic<M>(codec, commandExecutor, pattern);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name) {
return new RedissonBlockingFairQueue<V>(queueTransferService, commandExecutor, name, id);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec) {
return new RedissonBlockingFairQueue<V>(queueTransferService, codec, commandExecutor, name, id);
}
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(commandExecutor, name);

@ -0,0 +1,340 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {
private final RedissonFairLock fairLock;
private final QueueTransferService queueTransferService;
protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.queueTransferService = queueTransferService;
}
protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, Codec codec, CommandExecutor commandExecutor, String name, UUID id) {
super(codec, commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.queueTransferService = queueTransferService;
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), fairLock.getName(), fairLock.getThreadsQueueName(), fairLock.getTimeoutSetName());
}
@Override
public V take() throws InterruptedException {
fairLock.lockInterruptibly();
try {
return super.take();
} finally {
fairLock.unlock();
}
}
@Override
public RFuture<V> takeAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = fairLock.lockAsync();
lockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
final RFuture<V> takeFuture = takeAsync();
takeFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!takeFuture.isSuccess()) {
promise.tryFailure(takeFuture.cause());
return;
}
promise.trySuccess(takeFuture.getNow());
}
});
}
});
}
});
return promise;
}
@Override
public V poll() {
if (fairLock.tryLock()) {
try {
return super.poll();
} finally {
fairLock.unlock();
}
}
return null;
}
@Override
public RFuture<V> pollAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync();
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
}
}
return null;
}
@Override
public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
}
} else {
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
}
}
return null;
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (future.getNow()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}
promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
}
} else {
promise.trySuccess(null);
}
}
});
return promise;
}
public RDelayedQueue<V> getDealyedQueue() {
return null;
}
}

@ -48,11 +48,11 @@ public class RedissonFairLock extends RedissonLock implements RLock {
}
String getThreadsQueueName() {
return "redisson_lock_queue:{" + getName() + "}";
return prefixName("redisson_lock_queue", getName());
}
String getThreadElementName(long threadId) {
return "redisson_lock_thread:{" + getName() + "}:" + getLockName(threadId);
String getTimeoutSetName() {
return prefixName("redisson_lock_timeout", getName());
}
@Override
@ -77,6 +77,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
internalLockLeaseTime = unit.toMillis(leaseTime);
long threadWaitTime = 5000;
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
@ -85,7 +86,9 @@ public class RedissonFairLock extends RedissonLock implements RLock {
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[3]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
@ -96,7 +99,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('del', KEYS[3]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
@ -107,7 +110,8 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"return nil; " +
"end; " +
"return 1;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getThreadElementName(threadId)), internalLockLeaseTime, getLockName(threadId));
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),
internalLockLeaseTime, getLockName(threadId), currentTime);
}
if (command == RedisCommands.EVAL_LONG) {
@ -118,17 +122,19 @@ public class RedissonFairLock extends RedissonLock implements RLock {
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('del', KEYS[3]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
@ -138,19 +144,22 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"local firstThreadId = redis.call('lindex', KEYS[2], 0)" +
"local ttl = redis.call('pttl', KEYS[1]); " +
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
"ttl = redis.call('pttl', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId);" +
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else "
+ "ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
"if redis.call('exists', KEYS[3]) == 0 then " +
"local timeout = ttl + tonumber(ARGV[3]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"redis.call('set', KEYS[3], 1);" +
"end; " +
"redis.call('pexpire', KEYS[3], ttl + tonumber(ARGV[3]));" +
"return ttl;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getThreadElementName(threadId)),
internalLockLeaseTime, getLockName(threadId), threadWaitTime);
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
@ -158,25 +167,39 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
"if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[3], 0); " +
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[2] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
@ -189,29 +212,27 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[3], 0); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[2] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName(), getThreadsQueueName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName());
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal();
@ -222,7 +243,9 @@ public class RedissonFairLock extends RedissonLock implements RLock {
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[2]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
@ -233,12 +256,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"if (redis.call('del', KEYS[1]) == 1) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[3] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1 " +
"end " +
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getChannelName()), LockPubSub.unlockMessage);
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
LockPubSub.unlockMessage, System.currentTimeMillis());
}
}

@ -77,10 +77,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
String getChannelName() {
if (getName().contains("{")) {
return "redisson_lock__channel:" + getName();
}
return "redisson_lock__channel__{" + getName() + "}";
return prefixName("redisson_lock__channel", getName());
}
String getLockName(long threadId) {
@ -351,25 +348,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
@ -418,6 +397,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return isExists();
}
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName());
}
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
@ -442,27 +426,32 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return unlockAsync(threadId);
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = newPromise();
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override

@ -51,6 +51,13 @@ public abstract class RedissonObject implements RObject {
return commandExecutor.await(future, timeout, timeoutUnit);
}
protected String prefixName(String prefix, String name) {
if (getName().contains("{")) {
return prefix + ":" + name;
}
return prefix + ":{" + getName() + "}";
}
protected <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
}

@ -0,0 +1,30 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Blocking queue with fair polling and
* guarantees access order for poll and take methods.
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RBlockingFairQueue<V> extends RBlockingQueue<V> {
RDelayedQueue<V> getDealyedQueue();
}

@ -504,11 +504,21 @@ public interface RedissonClient {
*/
<M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec);
/**
* Returns unbounded fair queue instance by name
*
* @param name of queue
* @return queue
*/
<V> RBlockingFairQueue<V> getBlockingFairQueue(String name);
<V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec);
/**
* Returns unbounded queue instance by name.
*
* @param <V> type of value
* @param name - name of object
* @param name of object
* @return Queue object
*/
<V> RQueue<V> getQueue(String name);

Loading…
Cancel
Save