Merge branch 'master' into 3.0.0
commit
70e019a5cd
@ -1,782 +0,0 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.redisson.api.RBlockingFairQueue;
|
||||
import org.redisson.api.RFuture;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.codec.LongCodec;
|
||||
import org.redisson.client.codec.StringCodec;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.command.CommandExecutor;
|
||||
import org.redisson.misc.RPromise;
|
||||
import org.redisson.pubsub.SemaphorePubSub;
|
||||
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {
|
||||
|
||||
public static final long TIMEOUT_SECONDS = 30;
|
||||
|
||||
private final UUID id;
|
||||
private final AtomicInteger instances = new AtomicInteger();
|
||||
private final SemaphorePubSub semaphorePubSub;
|
||||
|
||||
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id, RedissonClient redisson) {
|
||||
super(commandExecutor, name, redisson);
|
||||
this.semaphorePubSub = semaphorePubSub;
|
||||
this.id = id;
|
||||
instances.incrementAndGet();
|
||||
}
|
||||
|
||||
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id, RedissonClient redisson) {
|
||||
super(codec, commandExecutor, name, redisson);
|
||||
this.semaphorePubSub = semaphorePubSub;
|
||||
this.id = id;
|
||||
instances.incrementAndGet();
|
||||
}
|
||||
|
||||
private String getIdsListName() {
|
||||
return suffixName(getName(), "list");
|
||||
}
|
||||
|
||||
private String getTimeoutName() {
|
||||
return suffixName(getName(), "timeout");
|
||||
}
|
||||
|
||||
private String getChannelName() {
|
||||
return suffixName(getName(), getCurrentId() + ":channel");
|
||||
}
|
||||
|
||||
private RedissonLockEntry getEntry() {
|
||||
return semaphorePubSub.getEntry(getName());
|
||||
}
|
||||
|
||||
private RFuture<RedissonLockEntry> subscribe() {
|
||||
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
|
||||
}
|
||||
|
||||
private void unsubscribe(RFuture<RedissonLockEntry> future) {
|
||||
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<Boolean> deleteAsync() {
|
||||
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getIdsListName(), getTimeoutName());
|
||||
}
|
||||
|
||||
private Long tryAcquire() {
|
||||
return get(tryAcquireAsync());
|
||||
}
|
||||
|
||||
private RFuture<Long> tryAcquireAsync() {
|
||||
long timeout = System.currentTimeMillis() + TIMEOUT_SECONDS*1000;
|
||||
|
||||
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
|
||||
|
||||
"local timeout = redis.call('get', KEYS[3]);"
|
||||
+ "if timeout ~= false and tonumber(timeout) <= tonumber(ARGV[3]) then "
|
||||
+ "redis.call('lpop', KEYS[2]); "
|
||||
+ "local nextValue = redis.call('lindex', KEYS[2], 0); "
|
||||
+ "if nextValue ~= false and nextValue ~= ARGV[1] then "
|
||||
+ "redis.call('set', KEYS[3], ARGV[2]);"
|
||||
+ "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);"
|
||||
+ "end; "
|
||||
+ "end; "
|
||||
|
||||
+ "local items = redis.call('lrange', KEYS[2], 0, -1) "
|
||||
+ "local found = false; "
|
||||
+ "for i=1,#items do "
|
||||
+ "if items[i] == ARGV[1] then "
|
||||
+ "found = true; "
|
||||
+ "break;"
|
||||
+ "end; "
|
||||
+ "end; "
|
||||
|
||||
+ "if found == false then "
|
||||
+ "redis.call('lpush', KEYS[2], ARGV[1]); "
|
||||
+ "end; "
|
||||
|
||||
+ "local value = redis.call('lindex', KEYS[2], 0); "
|
||||
+ "if value == ARGV[1] then "
|
||||
+ "redis.call('set', KEYS[3], ARGV[2]);"
|
||||
+ "local size = redis.call('llen', KEYS[2]); "
|
||||
+ "if size > 1 then "
|
||||
+ "redis.call('lpop', KEYS[2]);"
|
||||
+ "redis.call('rpush', KEYS[2], value);"
|
||||
+ "local nextValue = redis.call('lindex', KEYS[2], 0); "
|
||||
+ "redis.call('publish', '{' .. KEYS[1] .. '}:' .. nextValue .. ':channel', 1);"
|
||||
+ "end; "
|
||||
+ "return nil;"
|
||||
+ "end;"
|
||||
+ "return tonumber(timeout) - tonumber(ARGV[3]);",
|
||||
Arrays.<Object>asList(getName(), getIdsListName(), getTimeoutName()), getCurrentId(), timeout, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
private String getCurrentId() {
|
||||
return id.toString();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public V take() throws InterruptedException {
|
||||
Long currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
return super.take();
|
||||
}
|
||||
|
||||
RFuture<RedissonLockEntry> future = subscribe();
|
||||
commandExecutor.syncSubscription(future);
|
||||
try {
|
||||
while (true) {
|
||||
currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
return super.take();
|
||||
}
|
||||
|
||||
getEntry().getLatch().tryAcquire(currentTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
} finally {
|
||||
unsubscribe(future);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
if (instances.decrementAndGet() == 0) {
|
||||
get(commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
|
||||
"for i = 1, #ARGV, 1 do "
|
||||
+ "redis.call('lrem', KEYS[1], 0, ARGV[i]);"
|
||||
+"end; ",
|
||||
Collections.<Object>singletonList(getIdsListName()), getCurrentId()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<V> takeAsync() {
|
||||
final RPromise<V> promise = newPromise();
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
final Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.takeAsync();
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
|
||||
@Override
|
||||
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryTakeAsync(subscribeFuture, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V poll() {
|
||||
Long currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
return super.poll();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<V> pollAsync() {
|
||||
final RPromise<V> promise = newPromise();
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
final Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
Long currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
return super.poll(remainTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
RFuture<RedissonLockEntry> future = subscribe();
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
spentTime = System.currentTimeMillis() - startTime;
|
||||
remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
return super.poll(remainTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
spentTime = System.currentTimeMillis() - startTime;
|
||||
remainTime = unit.toMillis(timeout) - spentTime;
|
||||
remainTime = Math.min(remainTime, currentTimeout);
|
||||
if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
unsubscribe(future);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final RPromise<V> promise = newPromise();
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
} else {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
remainTime = Math.min(remainTime, currentTimeout);
|
||||
if (remainTime <= 0) {
|
||||
promise.trySuccess(null);
|
||||
return;
|
||||
}
|
||||
|
||||
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
|
||||
@Override
|
||||
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
|
||||
}
|
||||
});
|
||||
if (!subscribeFuture.isDone()) {
|
||||
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
if (!subscribeFuture.isDone()) {
|
||||
subscribeFuture.cancel(false);
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
}
|
||||
}, remainTime, TimeUnit.MILLISECONDS);
|
||||
futureRef.set(scheduledFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void tryTakeAsync(final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise) {
|
||||
if (promise.isDone()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
return;
|
||||
}
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.takeAsync();
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
unsubscribe(subscribeFuture);
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
final RedissonLockEntry entry = getEntry();
|
||||
synchronized (entry) {
|
||||
if (entry.getLatch().tryAcquire()) {
|
||||
tryTakeAsync(subscribeFuture, promise);
|
||||
} else {
|
||||
final AtomicBoolean executed = new AtomicBoolean();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
final Runnable listener = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
executed.set(true);
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryTakeAsync(subscribeFuture, promise);
|
||||
}
|
||||
};
|
||||
entry.addListener(listener);
|
||||
|
||||
if (!executed.get()) {
|
||||
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout t) throws Exception {
|
||||
synchronized (entry) {
|
||||
if (entry.removeListener(listener)) {
|
||||
tryTakeAsync(subscribeFuture, promise);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, currentTimeout, TimeUnit.MILLISECONDS);
|
||||
futureRef.set(scheduledFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private void tryPollAsync(final long startTime, final long timeout, final TimeUnit unit,
|
||||
final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise) {
|
||||
if (promise.isDone()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
return;
|
||||
}
|
||||
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime <= 0) {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.trySuccess(null);
|
||||
return;
|
||||
}
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
unsubscribe(subscribeFuture);
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
} else {
|
||||
final RedissonLockEntry entry = getEntry();
|
||||
synchronized (entry) {
|
||||
if (entry.getLatch().tryAcquire()) {
|
||||
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
|
||||
} else {
|
||||
final AtomicBoolean executed = new AtomicBoolean();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
final Runnable listener = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
executed.set(true);
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
|
||||
}
|
||||
};
|
||||
entry.addListener(listener);
|
||||
|
||||
if (!executed.get()) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout t) throws Exception {
|
||||
synchronized (entry) {
|
||||
if (entry.removeListener(listener)) {
|
||||
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, remainTime, TimeUnit.MILLISECONDS);
|
||||
futureRef.set(scheduledFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
Long currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
RFuture<RedissonLockEntry> future = subscribe();
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (!future.awaitUninterruptibly(remainTime, TimeUnit.MILLISECONDS)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
currentTimeout = tryAcquire();
|
||||
if (currentTimeout == null) {
|
||||
spentTime = System.currentTimeMillis() - startTime;
|
||||
remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
spentTime = System.currentTimeMillis() - startTime;
|
||||
remainTime = unit.toMillis(timeout) - spentTime;
|
||||
remainTime = Math.min(remainTime, currentTimeout);
|
||||
if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
unsubscribe(future);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final RPromise<V> promise = newPromise();
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
} else {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
remainTime = Math.min(remainTime, currentTimeout);
|
||||
if (remainTime <= 0) {
|
||||
promise.trySuccess(null);
|
||||
return;
|
||||
}
|
||||
|
||||
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
|
||||
@Override
|
||||
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
|
||||
}
|
||||
});
|
||||
if (!subscribeFuture.isDone()) {
|
||||
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
if (!subscribeFuture.isDone()) {
|
||||
subscribeFuture.cancel(false);
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
}
|
||||
}, remainTime, TimeUnit.MILLISECONDS);
|
||||
futureRef.set(scheduledFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void tryPollLastAndOfferFirstToAsync(final long startTime, final long timeout, final TimeUnit unit,
|
||||
final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<V> promise, final String queueName) {
|
||||
if (promise.isDone()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
return;
|
||||
}
|
||||
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime <= 0) {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.trySuccess(null);
|
||||
return;
|
||||
}
|
||||
|
||||
RFuture<Long> tryAcquireFuture = tryAcquireAsync();
|
||||
tryAcquireFuture.addListener(new FutureListener<Long>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Long> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
Long currentTimeout = future.getNow();
|
||||
if (currentTimeout == null) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
if (remainTime > 0) {
|
||||
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
|
||||
pollFuture.addListener(new FutureListener<V>() {
|
||||
@Override
|
||||
public void operationComplete(Future<V> future) throws Exception {
|
||||
unsubscribe(subscribeFuture);
|
||||
if (!future.isSuccess()) {
|
||||
promise.tryFailure(future.cause());
|
||||
return;
|
||||
}
|
||||
|
||||
promise.trySuccess(future.getNow());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
unsubscribe(subscribeFuture);
|
||||
promise.trySuccess(null);
|
||||
}
|
||||
} else {
|
||||
final RedissonLockEntry entry = getEntry();
|
||||
synchronized (entry) {
|
||||
if (entry.getLatch().tryAcquire()) {
|
||||
tryPollAsync(startTime, timeout, unit, subscribeFuture, promise);
|
||||
} else {
|
||||
final AtomicBoolean executed = new AtomicBoolean();
|
||||
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
|
||||
final Runnable listener = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
executed.set(true);
|
||||
if (futureRef.get() != null) {
|
||||
futureRef.get().cancel();
|
||||
}
|
||||
|
||||
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
|
||||
}
|
||||
};
|
||||
entry.addListener(listener);
|
||||
|
||||
if (!executed.get()) {
|
||||
long spentTime = System.currentTimeMillis() - startTime;
|
||||
long remainTime = unit.toMillis(timeout) - spentTime;
|
||||
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout t) throws Exception {
|
||||
synchronized (entry) {
|
||||
if (entry.removeListener(listener)) {
|
||||
tryPollLastAndOfferFirstToAsync(startTime, timeout, unit, subscribeFuture, promise, queueName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, remainTime, TimeUnit.MILLISECONDS);
|
||||
futureRef.set(scheduledFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
/**
|
||||
* Blocking queue with fair polling and
|
||||
* guarantees access order for poll and take methods.
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <V> value
|
||||
*/
|
||||
public interface RBlockingFairQueue<V> extends RBlockingQueue<V>, RDestroyable {
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public interface RLockReactive extends RExpirableReactive {
|
||||
|
||||
Publisher<Boolean> forceUnlock();
|
||||
|
||||
Publisher<Void> unlock();
|
||||
|
||||
Publisher<Void> unlock(long threadId);
|
||||
|
||||
Publisher<Boolean> tryLock();
|
||||
|
||||
Publisher<Void> lock();
|
||||
|
||||
Publisher<Void> lock(long threadId);
|
||||
|
||||
Publisher<Void> lock(long leaseTime, TimeUnit unit);
|
||||
|
||||
Publisher<Void> lock(long leaseTime, TimeUnit unit, long threadId);
|
||||
|
||||
Publisher<Boolean> tryLock(long threadId);
|
||||
|
||||
Publisher<Boolean> tryLock(long waitTime, TimeUnit unit);
|
||||
|
||||
Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
|
||||
|
||||
Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit, long threadId);
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
/**
|
||||
* A {@code ReadWriteLock} maintains a pair of associated {@link
|
||||
* Lock locks}, one for read-only operations and one for writing.
|
||||
* The {@link #readLock read lock} may be held simultaneously by
|
||||
* multiple reader threads, so long as there are no writers. The
|
||||
* {@link #writeLock write lock} is exclusive.
|
||||
*
|
||||
* Works in non-fair mode. Therefore order of read and write
|
||||
* locking is unspecified.
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public interface RReadWriteLockReactive extends RExpirableReactive {
|
||||
|
||||
/**
|
||||
* Returns the lock used for reading.
|
||||
*
|
||||
* @return the lock used for reading
|
||||
*/
|
||||
RLockReactive readLock();
|
||||
|
||||
/**
|
||||
* Returns the lock used for writing.
|
||||
*
|
||||
* @return the lock used for writing
|
||||
*/
|
||||
RLockReactive writeLock();
|
||||
|
||||
}
|
@ -0,0 +1,119 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.command;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
import org.redisson.api.RFuture;
|
||||
import org.redisson.api.RedissonReactiveClient;
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.connection.ConnectionManager;
|
||||
import org.redisson.connection.NodeSource;
|
||||
import org.redisson.misc.RPromise;
|
||||
import org.redisson.reactive.NettyFuturePublisher;
|
||||
|
||||
import reactor.fn.Supplier;
|
||||
import reactor.rx.action.support.DefaultSubscriber;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class CommandReactiveBatchService extends CommandReactiveService {
|
||||
|
||||
private final CommandBatchService batchService;
|
||||
private final Queue<Publisher<?>> publishers = new ConcurrentLinkedQueue<Publisher<?>>();
|
||||
|
||||
public CommandReactiveBatchService(ConnectionManager connectionManager) {
|
||||
super(connectionManager);
|
||||
batchService = new CommandBatchService(connectionManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
|
||||
NettyFuturePublisher<R> publisher = new NettyFuturePublisher<R>(supplier);
|
||||
publishers.add(publisher);
|
||||
return publisher;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
|
||||
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt) {
|
||||
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt);
|
||||
}
|
||||
|
||||
public List<?> execute() {
|
||||
return get(executeAsync(0, 0, 0));
|
||||
}
|
||||
|
||||
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
|
||||
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
|
||||
}
|
||||
|
||||
public RFuture<Void> executeAsyncVoid() {
|
||||
return executeAsyncVoid(false, 0, 0, 0);
|
||||
}
|
||||
|
||||
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
|
||||
for (Publisher<?> publisher : publishers) {
|
||||
publisher.subscribe(new DefaultSubscriber<Object>() {
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
s.request(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval);
|
||||
}
|
||||
|
||||
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
|
||||
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
|
||||
}
|
||||
|
||||
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
|
||||
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
|
||||
}
|
||||
|
||||
public RFuture<List<?>> executeAsync() {
|
||||
return executeAsync(0, 0, 0);
|
||||
}
|
||||
|
||||
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
|
||||
for (Publisher<?> publisher : publishers) {
|
||||
publisher.subscribe(new DefaultSubscriber<Object>() {
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
s.request(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
|
||||
batchService.enableRedissonReferenceSupport(redissonReactive);
|
||||
return super.enableRedissonReferenceSupport(redissonReactive);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,169 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.reactive;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.redisson.RedissonLock;
|
||||
import org.redisson.api.RFuture;
|
||||
import org.redisson.api.RLockAsync;
|
||||
import org.redisson.api.RLockReactive;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.command.CommandReactiveExecutor;
|
||||
|
||||
import reactor.fn.Supplier;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive {
|
||||
|
||||
private final RLockAsync instance;
|
||||
|
||||
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
|
||||
super(connectionManager, name);
|
||||
instance = createLock(connectionManager, name, id);
|
||||
}
|
||||
|
||||
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
|
||||
return new RedissonLock(commandExecutor, name, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> forceUnlock() {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.forceUnlockAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> unlock() {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.unlockAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> unlock(final long threadId) {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.unlockAsync(threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryLock() {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryLockAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> lock() {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.lockAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> lock(final long threadId) {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.lockAsync(threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit) {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.lockAsync(leaseTime, unit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit, final long threadId) {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.lockAsync(leaseTime, unit, threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryLock(final long threadId) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryLockAsync(threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryLock(final long waitTime, final TimeUnit unit) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryLockAsync(waitTime, unit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryLockAsync(waitTime, leaseTime, unit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryLockAsync(waitTime, leaseTime, unit, threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Copyright 2016 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.reactive;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.redisson.RedissonReadWriteLock;
|
||||
import org.redisson.api.RLockAsync;
|
||||
import org.redisson.api.RLockReactive;
|
||||
import org.redisson.api.RReadWriteLock;
|
||||
import org.redisson.api.RReadWriteLockReactive;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.command.CommandReactiveExecutor;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive {
|
||||
|
||||
private final RReadWriteLock instance;
|
||||
private final UUID id;
|
||||
|
||||
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name, UUID id) {
|
||||
super(commandExecutor, name);
|
||||
this.id = id;
|
||||
this.instance = new RedissonReadWriteLock(commandExecutor, name, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLockReactive readLock() {
|
||||
return new RedissonLockReactive(commandExecutor, getName(), id) {
|
||||
@Override
|
||||
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
|
||||
return instance.readLock();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLockReactive writeLock() {
|
||||
return new RedissonLockReactive(commandExecutor, getName(), id) {
|
||||
@Override
|
||||
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
|
||||
return instance.writeLock();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,226 +0,0 @@
|
||||
package org.redisson;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.redisson.api.RBlockingFairQueue;
|
||||
import org.redisson.api.RBlockingQueue;
|
||||
import org.redisson.api.RedissonClient;
|
||||
|
||||
public class RedissonBlockingFairQueueTest extends BaseTest {
|
||||
|
||||
@Test
|
||||
public void testTimeout() throws InterruptedException {
|
||||
int size = 1000;
|
||||
CountDownLatch latch = new CountDownLatch(size);
|
||||
AtomicInteger t1Counter = new AtomicInteger();
|
||||
AtomicInteger t2Counter = new AtomicInteger();
|
||||
AtomicInteger t3Counter = new AtomicInteger();
|
||||
|
||||
RedissonClient redisson1 = createInstance();
|
||||
RBlockingFairQueue<String> queue1 = redisson1.getBlockingFairQueue("test");
|
||||
Thread t1 = new Thread("test-thread1") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue1.poll(5, TimeUnit.SECONDS);
|
||||
if (latch.getCount() == 0) {
|
||||
break;
|
||||
}
|
||||
if (a == null) {
|
||||
continue;
|
||||
}
|
||||
latch.countDown();
|
||||
t1Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RedissonClient redisson2 = createInstance();
|
||||
RBlockingFairQueue<String> queue2 = redisson2.getBlockingFairQueue("test");
|
||||
Thread t2 = new Thread("test-thread2") {
|
||||
public void run() {
|
||||
try {
|
||||
String a = queue2.poll(2, TimeUnit.SECONDS);
|
||||
if (a != null) {
|
||||
latch.countDown();
|
||||
t2Counter.incrementAndGet();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RedissonClient redisson3 = createInstance();
|
||||
RBlockingFairQueue<String> queue3 = redisson3.getBlockingFairQueue("test");
|
||||
Thread t3 = new Thread("test-thread3") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue3.poll(5, TimeUnit.SECONDS);
|
||||
if (latch.getCount() == 0) {
|
||||
break;
|
||||
}
|
||||
if (a == null) {
|
||||
continue;
|
||||
}
|
||||
latch.countDown();
|
||||
t3Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
t1.start();
|
||||
t1.join(500);
|
||||
t2.start();
|
||||
t2.join(500);
|
||||
t3.start();
|
||||
t3.join(500);
|
||||
|
||||
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
|
||||
assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(3);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
queue.add("" + i);
|
||||
}
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
t3.join();
|
||||
|
||||
assertThat(latch.await(50, TimeUnit.SECONDS)).isTrue();
|
||||
|
||||
assertThat(t1Counter.get()).isBetween(499, 500);
|
||||
assertThat(t2Counter.get()).isEqualTo(1);
|
||||
assertThat(t3Counter.get()).isBetween(499, 500);
|
||||
|
||||
assertThat(redisson.getList("{" + queue.getName() + "}:list").size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairness() throws InterruptedException {
|
||||
int size = 1000;
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(size);
|
||||
AtomicInteger t1Counter = new AtomicInteger();
|
||||
AtomicInteger t2Counter = new AtomicInteger();
|
||||
AtomicInteger t3Counter = new AtomicInteger();
|
||||
AtomicInteger t4Counter = new AtomicInteger();
|
||||
|
||||
RedissonClient redisson1 = createInstance();
|
||||
RBlockingFairQueue<String> queue1 = redisson1.getBlockingFairQueue("test");
|
||||
Thread t1 = new Thread("test-thread1") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue1.poll(1, TimeUnit.SECONDS);
|
||||
if (a == null) {
|
||||
break;
|
||||
}
|
||||
latch.countDown();
|
||||
t1Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RedissonClient redisson2 = createInstance();
|
||||
RBlockingFairQueue<String> queue2 = redisson2.getBlockingFairQueue("test");
|
||||
Thread t2 = new Thread("test-thread2") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue2.poll(1, TimeUnit.SECONDS);
|
||||
if (a == null) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
latch.countDown();
|
||||
t2Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RedissonClient redisson3 = createInstance();
|
||||
RBlockingFairQueue<String> queue3 = redisson3.getBlockingFairQueue("test");
|
||||
Thread t3 = new Thread("test-thread3") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue3.poll(1, TimeUnit.SECONDS);
|
||||
if (a == null) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
latch.countDown();
|
||||
t3Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RedissonClient redisson4 = createInstance();
|
||||
RBlockingFairQueue<String> queue4 = redisson4.getBlockingFairQueue("test");
|
||||
Thread t4 = new Thread("test-thread4") {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
String a = queue4.poll(1, TimeUnit.SECONDS);
|
||||
if (a == null) {
|
||||
break;
|
||||
}
|
||||
latch.countDown();
|
||||
t4Counter.incrementAndGet();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
|
||||
for (int i = 0; i < size; i++) {
|
||||
queue.add("" + i);
|
||||
}
|
||||
|
||||
t1.start();
|
||||
t2.start();
|
||||
t3.start();
|
||||
t4.start();
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
t3.join();
|
||||
t4.join();
|
||||
|
||||
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
|
||||
|
||||
queue1.destroy();
|
||||
queue2.destroy();
|
||||
queue3.destroy();
|
||||
queue4.destroy();
|
||||
redisson1.shutdown();
|
||||
redisson2.shutdown();
|
||||
redisson3.shutdown();
|
||||
redisson4.shutdown();
|
||||
|
||||
assertThat(t1Counter.get()).isEqualTo(250);
|
||||
assertThat(t2Counter.get()).isEqualTo(250);
|
||||
assertThat(t3Counter.get()).isEqualTo(250);
|
||||
assertThat(t4Counter.get()).isEqualTo(250);
|
||||
assertThat(redisson.getKeys().count()).isEqualTo(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue