diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 000eff669..6ab2dc169 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -135,6 +135,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } private void tryAcquireAsync(final AtomicLong time, final int permits, final RFuture subscribeFuture, final RPromise result) { + if (result.isDone()) { + unsubscribe(subscribeFuture); + return; + } + RFuture tryAcquireFuture = tryAcquireAsync(permits); tryAcquireFuture.addListener(new FutureListener() { @Override @@ -144,10 +149,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { result.tryFailure(future.cause()); return; } - + if (future.getNow()) { unsubscribe(subscribeFuture); - result.trySuccess(true); + if (!result.trySuccess(true)) { + releaseAsync(permits); + } return; } @@ -170,8 +177,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void run() { executed.set(true); - if (futureRef.get() != null) { - futureRef.get().cancel(); + if (futureRef.get() != null && !futureRef.get().cancel()) { + entry.getLatch().release(); + return; } long elapsed = System.currentTimeMillis() - current; time.addAndGet(-elapsed); @@ -206,19 +214,26 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } private void acquireAsync(final int permits, final RFuture subscribeFuture, final RPromise result) { + if (result.isDone()) { + unsubscribe(subscribeFuture); + return; + } + RFuture tryAcquireFuture = tryAcquireAsync(permits); tryAcquireFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { unsubscribe(subscribeFuture); - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (future.getNow()) { unsubscribe(subscribeFuture); - result.setSuccess(null); + if (!result.trySuccess(null)) { + releaseAsync(permits); + } return; } @@ -321,12 +336,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (future.getNow()) { - result.setSuccess(true); + if (!result.trySuccess(true)) { + releaseAsync(permits); + } return; } @@ -337,14 +354,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } - + long elapsed = System.currentTimeMillis() - current; time.addAndGet(-elapsed); @@ -455,5 +472,42 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { + "end;", Arrays.asList(getName(), getChannelName()), permits); } + + @Override + public boolean trySetPermits(int permits) { + return get(trySetPermitsAsync(permits)); + } + + @Override + public RFuture trySetPermitsAsync(int permits) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false or value == 0) then " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(getName(), getChannelName()), permits); + } + @Override + public void reducePermits(int permits) { + get(reducePermitsAsync(permits)); + } + + @Override + public RFuture reducePermitsAsync(int permits) { + if (permits < 0) { + throw new IllegalArgumentException(); + } + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false) then " + + "value = 0;" + + "end;" + + "redis.call('set', KEYS[1], value - ARGV[1]); ", + Arrays.asList(getName(), getChannelName()), permits); + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java index 391234f8c..dfccf562f 100644 --- a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -3,22 +3,43 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.junit.Assume; +import org.junit.Assume; import org.junit.Test; import org.redisson.api.RSemaphore; public class RedissonSemaphoreTest extends BaseConcurrentTest { + @Test + public void testTrySetPermits() { + RSemaphore s = redisson.getSemaphore("test"); + assertThat(s.trySetPermits(10)).isTrue(); + assertThat(s.availablePermits()).isEqualTo(10); + assertThat(s.trySetPermits(15)).isFalse(); + assertThat(s.availablePermits()).isEqualTo(10); + } + + @Test + public void testReducePermits() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.trySetPermits(10); + + s.acquire(10); + s.reducePermits(5); + assertThat(s.availablePermits()).isEqualTo(-5); + s.release(10); + assertThat(s.availablePermits()).isEqualTo(5); + s.acquire(5); + assertThat(s.availablePermits()).isEqualTo(0); + } + @Test public void testBlockingAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); s.acquire(); Thread t = new Thread() { @@ -46,7 +67,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testBlockingNAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(5); + s.trySetPermits(5); s.acquire(3); Thread t = new Thread() { @@ -80,7 +101,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testTryNAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(5); + s.trySetPermits(5); assertThat(s.tryAcquire(3)).isTrue(); Thread t = new Thread() { @@ -126,7 +147,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testDrainPermits() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); s.acquire(3); assertThat(s.drainPermits()).isEqualTo(7); @@ -136,7 +157,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testReleaseAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); s.acquire(); assertThat(s.availablePermits()).isEqualTo(9); s.release(); @@ -153,7 +174,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); int iterations = 15; testSingleInstanceConcurrency(iterations, r -> { @@ -178,7 +199,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); testMultiInstanceConcurrency(16, r -> { for (int i = 0; i < iterations; i++) { @@ -208,7 +229,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); testMultiInstanceConcurrency(iterations, r -> { RSemaphore s1 = r.getSemaphore("test"); @@ -233,7 +254,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());