From 2b8d524b142063b40e022e2ee4b1a6cfe7708c42 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 10 Nov 2016 11:04:01 +0300 Subject: [PATCH] BlockingFairQueue implementation #695 --- .../src/main/java/org/redisson/Redisson.java | 12 + .../redisson/RedissonBlockingFairQueue.java | 340 ++++++++++++++++++ .../java/org/redisson/RedissonFairLock.java | 106 +++--- .../main/java/org/redisson/RedissonLock.java | 73 ++-- .../java/org/redisson/RedissonObject.java | 7 + .../org/redisson/api/RBlockingFairQueue.java | 30 ++ .../java/org/redisson/api/RedissonClient.java | 12 +- 7 files changed, 496 insertions(+), 84 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java create mode 100644 redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index e40f4c800..b486ae56d 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -29,6 +29,7 @@ import org.redisson.api.RBatch; import org.redisson.api.RBinaryStream; import org.redisson.api.RBitSet; import org.redisson.api.RBlockingDeque; +import org.redisson.api.RBlockingFairQueue; import org.redisson.api.RBlockingQueue; import org.redisson.api.RBloomFilter; import org.redisson.api.RBoundedBlockingQueue; @@ -92,6 +93,7 @@ public class Redisson implements RedissonClient { RedissonReference.warmUp(); } + protected final QueueTransferService queueTransferService = new QueueTransferService(); protected final EvictionScheduler evictionScheduler; protected final CommandExecutor commandExecutor; protected final ConnectionManager connectionManager; @@ -423,6 +425,16 @@ public class Redisson implements RedissonClient { return new RedissonPatternTopic(codec, commandExecutor, pattern); } + @Override + public RBlockingFairQueue getBlockingFairQueue(String name) { + return new RedissonBlockingFairQueue(queueTransferService, commandExecutor, name, id); + } + + @Override + public RBlockingFairQueue getBlockingFairQueue(String name, Codec codec) { + return new RedissonBlockingFairQueue(queueTransferService, codec, commandExecutor, name, id); + } + @Override public RQueue getQueue(String name) { return new RedissonQueue(commandExecutor, name); diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java new file mode 100644 index 000000000..ffe2b9540 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java @@ -0,0 +1,340 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RBlockingFairQueue; +import org.redisson.api.RDelayedQueue; +import org.redisson.api.RFuture; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.misc.RPromise; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonBlockingFairQueue extends RedissonBlockingQueue implements RBlockingFairQueue { + + private final RedissonFairLock fairLock; + private final QueueTransferService queueTransferService; + + protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, CommandExecutor commandExecutor, String name, UUID id) { + super(commandExecutor, name); + String lockName = prefixName("redisson_bfq_lock", name); + fairLock = new RedissonFairLock(commandExecutor, lockName, id); + this.queueTransferService = queueTransferService; + } + + protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, Codec codec, CommandExecutor commandExecutor, String name, UUID id) { + super(codec, commandExecutor, name); + String lockName = prefixName("redisson_bfq_lock", name); + fairLock = new RedissonFairLock(commandExecutor, lockName, id); + this.queueTransferService = queueTransferService; + } + + @Override + public RFuture deleteAsync() { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), fairLock.getName(), fairLock.getThreadsQueueName(), fairLock.getTimeoutSetName()); + } + + @Override + public V take() throws InterruptedException { + fairLock.lockInterruptibly(); + try { + return super.take(); + } finally { + fairLock.unlock(); + } + } + + @Override + public RFuture takeAsync() { + final RPromise promise = newPromise(); + final long threadId = Thread.currentThread().getId(); + RFuture lockFuture = fairLock.lockAsync(); + lockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + final RFuture takeFuture = takeAsync(); + takeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (!takeFuture.isSuccess()) { + promise.tryFailure(takeFuture.cause()); + return; + } + + promise.trySuccess(takeFuture.getNow()); + } + }); + } + }); + } + }); + + return promise; + } + + @Override + public V poll() { + if (fairLock.tryLock()) { + try { + return super.poll(); + } finally { + fairLock.unlock(); + } + } + return null; + } + + @Override + public RFuture pollAsync() { + final RPromise promise = newPromise(); + final long threadId = Thread.currentThread().getId(); + RFuture tryLockFuture = fairLock.tryLockAsync(); + tryLockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (future.getNow()) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (!pollFuture.isSuccess()) { + promise.tryFailure(pollFuture.cause()); + return; + } + + promise.trySuccess(pollFuture.getNow()); + } + }); + } + }); + } else { + promise.trySuccess(null); + } + } + }); + + return promise; + } + + + @Override + public V poll(long timeout, TimeUnit unit) throws InterruptedException { + long startTime = System.currentTimeMillis(); + if (fairLock.tryLock(timeout, unit)) { + try { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + return super.poll(remainTime, TimeUnit.MILLISECONDS); + } + return null; + } finally { + fairLock.unlock(); + } + } + return null; + } + + @Override + public RFuture pollAsync(final long timeout, final TimeUnit unit) { + final long startTime = System.currentTimeMillis(); + final RPromise promise = newPromise(); + final long threadId = Thread.currentThread().getId(); + RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); + tryLockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (future.getNow()) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (!pollFuture.isSuccess()) { + promise.tryFailure(pollFuture.cause()); + return; + } + + promise.trySuccess(pollFuture.getNow()); + } + }); + } + }); + } else { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(null); + } + }); + } + } else { + promise.trySuccess(null); + } + } + }); + + return promise; + } + + @Override + public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { + long startTime = System.currentTimeMillis(); + if (fairLock.tryLock(timeout, unit)) { + try { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS); + } + return null; + } finally { + fairLock.unlock(); + } + } + return null; + } + + @Override + public RFuture pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) { + final long startTime = System.currentTimeMillis(); + final RPromise promise = newPromise(); + final long threadId = Thread.currentThread().getId(); + RFuture tryLockFuture = fairLock.tryLockAsync(timeout, unit); + tryLockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (future.getNow()) { + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime > 0) { + final RFuture pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS); + pollFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + if (!pollFuture.isSuccess()) { + promise.tryFailure(pollFuture.cause()); + return; + } + + promise.trySuccess(pollFuture.getNow()); + } + }); + } + }); + } else { + RFuture unlockFuture = fairLock.unlockAsync(threadId); + unlockFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(null); + } + }); + } + } else { + promise.trySuccess(null); + } + } + }); + + return promise; + } + + public RDelayedQueue getDealyedQueue() { + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 0540be769..610ca3499 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -48,11 +48,11 @@ public class RedissonFairLock extends RedissonLock implements RLock { } String getThreadsQueueName() { - return "redisson_lock_queue:{" + getName() + "}"; + return prefixName("redisson_lock_queue", getName()); } - String getThreadElementName(long threadId) { - return "redisson_lock_thread:{" + getName() + "}:" + getLockName(threadId); + String getTimeoutSetName() { + return prefixName("redisson_lock_timeout", getName()); } @Override @@ -77,6 +77,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { internalLockLeaseTime = unit.toMillis(leaseTime); long threadWaitTime = 5000; + long currentTime = System.currentTimeMillis(); if (command == RedisCommands.EVAL_NULL_BOOLEAN) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads @@ -85,7 +86,9 @@ public class RedissonFairLock extends RedissonLock implements RLock { + "if firstThreadId2 == false then " + "break;" + "end; " - + "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[3]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" @@ -96,7 +99,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + "redis.call('lpop', KEYS[2]); " + - "redis.call('del', KEYS[3]); " + + "redis.call('zrem', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + @@ -107,7 +110,8 @@ public class RedissonFairLock extends RedissonLock implements RLock { "return nil; " + "end; " + "return 1;", - Arrays.asList(getName(), getThreadsQueueName(), getThreadElementName(threadId)), internalLockLeaseTime, getLockName(threadId)); + Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName()), + internalLockLeaseTime, getLockName(threadId), currentTime); } if (command == RedisCommands.EVAL_LONG) { @@ -118,17 +122,19 @@ public class RedissonFairLock extends RedissonLock implements RLock { + "if firstThreadId2 == false then " + "break;" + "end; " - + "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" - + - "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + + + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + "redis.call('lpop', KEYS[2]); " + - "redis.call('del', KEYS[3]); " + + "redis.call('zrem', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + @@ -138,19 +144,22 @@ public class RedissonFairLock extends RedissonLock implements RLock { "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + - "local firstThreadId = redis.call('lindex', KEYS[2], 0)" + - "local ttl = redis.call('pttl', KEYS[1]); " + + + "local firstThreadId = redis.call('lindex', KEYS[2], 0); " + + "local ttl; " + "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + - "ttl = redis.call('pttl', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId);" + + "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + + "else " + + "ttl = redis.call('pttl', KEYS[1]);" + "end; " + - "if redis.call('exists', KEYS[3]) == 0 then " + + + "local timeout = ttl + tonumber(ARGV[3]);" + + "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call('rpush', KEYS[2], ARGV[2]);" + - "redis.call('set', KEYS[3], 1);" + "end; " + - "redis.call('pexpire', KEYS[3], ttl + tonumber(ARGV[3]));" + "return ttl;", - Arrays.asList(getName(), getThreadsQueueName(), getThreadElementName(threadId)), - internalLockLeaseTime, getLockName(threadId), threadWaitTime); + Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName()), + internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); } throw new IllegalArgumentException(); @@ -158,25 +167,39 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override public void unlock() { - Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); + + if (opStatus == null) { + throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + + id + " thread-id: " + Thread.currentThread().getId()); + } + if (opStatus) { + cancelExpirationRenewal(); + } + } + + @Override + protected RFuture unlockInnerAsync(long threadId) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; " - + "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" - + - "if (redis.call('exists', KEYS[1]) == 0) then " + - "local nextThreadId = redis.call('lindex', KEYS[3], 0); " + + + "if (redis.call('exists', KEYS[1]) == 0) then " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[2] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; " + "end;" + @@ -189,29 +212,27 @@ public class RedissonFairLock extends RedissonLock implements RLock { "return 0; " + "else " + "redis.call('del', KEYS[1]); " + - "local nextThreadId = redis.call('lindex', KEYS[3], 0); " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[2] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; "+ "end; " + "return nil;", - Arrays.asList(getName(), getChannelName(), getThreadsQueueName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); - - if (opStatus == null) { - throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " - + id + " thread-id: " + Thread.currentThread().getId()); - } - if (opStatus) { - cancelExpirationRenewal(); - } + Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), + LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); } - + @Override public Condition newCondition() { throw new UnsupportedOperationException(); } + @Override + public RFuture deleteAsync() { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName()); + } + @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(); @@ -222,7 +243,9 @@ public class RedissonFairLock extends RedissonLock implements RLock { + "if firstThreadId2 == false then " + "break;" + "end; " - + "if redis.call('exists', 'redisson_lock_thread:{' .. KEYS[1] .. '}:' .. firstThreadId2) == 0 then " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[2]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" @@ -233,12 +256,13 @@ public class RedissonFairLock extends RedissonLock implements RLock { "if (redis.call('del', KEYS[1]) == 1) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[3] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + - "return 1 " + - "end " + + "return 1; " + + "end; " + "return 0;", - Arrays.asList(getName(), getThreadsQueueName(), getChannelName()), LockPubSub.unlockMessage); + Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), + LockPubSub.unlockMessage, System.currentTimeMillis()); } } diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index f175fea7e..7abc63daa 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -77,10 +77,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } String getChannelName() { - if (getName().contains("{")) { - return "redisson_lock__channel:" + getName(); - } - return "redisson_lock__channel__{" + getName() + "}"; + return prefixName("redisson_lock__channel", getName()); } String getLockName(long threadId) { @@ -351,25 +348,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void unlock() { - Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('exists', KEYS[1]) == 0) then " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "return 1; " + - "end;" + - "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + - "return nil;" + - "end; " + - "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + - "if (counter > 0) then " + - "redis.call('pexpire', KEYS[1], ARGV[2]); " + - "return 0; " + - "else " + - "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "return 1; "+ - "end; " + - "return nil;", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); + Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); @@ -418,6 +397,11 @@ public class RedissonLock extends RedissonExpirable implements RLock { return isExists(); } + @Override + public RFuture isExistsAsync() { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName()); + } + @Override public boolean isHeldByCurrentThread() { return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); @@ -442,27 +426,32 @@ public class RedissonLock extends RedissonExpirable implements RLock { return unlockAsync(threadId); } + protected RFuture unlockInnerAsync(long threadId) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if (redis.call('exists', KEYS[1]) == 0) then " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1; " + + "end;" + + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + + "return nil;" + + "end; " + + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + + "if (counter > 0) then " + + "redis.call('pexpire', KEYS[1], ARGV[2]); " + + "return 0; " + + "else " + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1; "+ + "end; " + + "return nil;", + Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); + + } + public RFuture unlockAsync(final long threadId) { final RPromise result = newPromise(); - RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('exists', KEYS[1]) == 0) then " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "return 1; " + - "end;" + - "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + - "return nil;" + - "end; " + - "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + - "if (counter > 0) then " + - "redis.call('pexpire', KEYS[1], ARGV[2]); " + - "return 0; " + - "else " + - "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "return 1; "+ - "end; " + - "return nil;", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); + RFuture future = unlockInnerAsync(threadId); future.addListener(new FutureListener() { @Override diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 52bca0bc9..af7dc40a0 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -51,6 +51,13 @@ public abstract class RedissonObject implements RObject { return commandExecutor.await(future, timeout, timeoutUnit); } + protected String prefixName(String prefix, String name) { + if (getName().contains("{")) { + return prefix + ":" + name; + } + return prefix + ":{" + getName() + "}"; + } + protected V get(RFuture future) { return commandExecutor.get(future); } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java new file mode 100644 index 000000000..089bcdc5c --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RBlockingFairQueue.java @@ -0,0 +1,30 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * Blocking queue with fair polling and + * guarantees access order for poll and take methods. + * + * @author Nikita Koksharov + * + * @param value + */ +public interface RBlockingFairQueue extends RBlockingQueue { + + RDelayedQueue getDealyedQueue(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index fc786d42a..fafb5bb02 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -504,11 +504,21 @@ public interface RedissonClient { */ RPatternTopic getPatternTopic(String pattern, Codec codec); + /** + * Returns unbounded fair queue instance by name + * + * @param name of queue + * @return queue + */ + RBlockingFairQueue getBlockingFairQueue(String name); + + RBlockingFairQueue getBlockingFairQueue(String name, Codec codec); + /** * Returns unbounded queue instance by name. * * @param type of value - * @param name - name of object + * @param name of object * @return Queue object */ RQueue getQueue(String name);