RSemaphore.reducePermits method added. #603

pull/605/head
Nikita 9 years ago
parent 0e5e4014f7
commit 8943f0655d

@ -135,6 +135,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
private void tryAcquireAsync(final AtomicLong time, final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<Boolean> result) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -144,10 +149,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
unsubscribe(subscribeFuture);
result.trySuccess(true);
if (!result.trySuccess(true)) {
releaseAsync(permits);
}
return;
}
@ -170,8 +177,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel();
if (futureRef.get() != null && !futureRef.get().cancel()) {
entry.getLatch().release();
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
@ -206,19 +214,26 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
private void acquireAsync(final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<Void> result) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
unsubscribe(subscribeFuture);
result.setSuccess(null);
if (!result.trySuccess(null)) {
releaseAsync(permits);
}
return;
}
@ -321,12 +336,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
result.setSuccess(true);
if (!result.trySuccess(true)) {
releaseAsync(permits);
}
return;
}
@ -337,14 +354,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
@ -455,5 +472,42 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
+ "end;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
@Override
public void reducePermits(int permits) {
get(reducePermitsAsync(permits));
}
@Override
public RFuture<Void> reducePermitsAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], value - ARGV[1]); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
}

@ -3,22 +3,43 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assume;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.api.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testTrySetPermits() {
RSemaphore s = redisson.getSemaphore("test");
assertThat(s.trySetPermits(10)).isTrue();
assertThat(s.availablePermits()).isEqualTo(10);
assertThat(s.trySetPermits(15)).isFalse();
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testReducePermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.trySetPermits(10);
s.acquire(10);
s.reducePermits(5);
assertThat(s.availablePermits()).isEqualTo(-5);
s.release(10);
assertThat(s.availablePermits()).isEqualTo(5);
s.acquire(5);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testBlockingAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
s.acquire();
Thread t = new Thread() {
@ -46,7 +67,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testBlockingNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
s.trySetPermits(5);
s.acquire(3);
Thread t = new Thread() {
@ -80,7 +101,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testTryNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
s.trySetPermits(5);
assertThat(s.tryAcquire(3)).isTrue();
Thread t = new Thread() {
@ -126,7 +147,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testDrainPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
s.acquire(3);
assertThat(s.drainPermits()).isEqualTo(7);
@ -136,7 +157,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testReleaseAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
s.acquire();
assertThat(s.availablePermits()).isEqualTo(9);
s.release();
@ -153,7 +174,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
int iterations = 15;
testSingleInstanceConcurrency(iterations, r -> {
@ -178,7 +199,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
@ -208,7 +229,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
testMultiInstanceConcurrency(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
@ -233,7 +254,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());

Loading…
Cancel
Save