From c3dd888b9676c086fc7be348cef4d61ffa490715 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 17 Dec 2015 14:33:03 +0300 Subject: [PATCH] RSemaphore improvements and tests. #207 --- .../java/org/redisson/RedissonClient.java | 6 + .../java/org/redisson/RedissonSemaphore.java | 43 ++- .../client/protocol/RedisCommands.java | 1 + .../java/org/redisson/core/RSemaphore.java | 155 ++++++++++- .../org/redisson/RedissonSemaphoreTest.java | 260 ++++++++++++++++++ 5 files changed, 456 insertions(+), 9 deletions(-) create mode 100644 src/test/java/org/redisson/RedissonSemaphoreTest.java diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 022211c90..b133b2885 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -224,6 +224,12 @@ public interface RedissonClient { */ RMap getMap(String name, Codec codec); + /** + * Returns semaphore instance by name + * + * @param name of semaphore + * @return + */ RSemaphore getSemaphore(String name); /** diff --git a/src/main/java/org/redisson/RedissonSemaphore.java b/src/main/java/org/redisson/RedissonSemaphore.java index 5173c87e7..60ae2eb31 100644 --- a/src/main/java/org/redisson/RedissonSemaphore.java +++ b/src/main/java/org/redisson/RedissonSemaphore.java @@ -30,9 +30,9 @@ import org.redisson.pubsub.LockPubSub; import io.netty.util.concurrent.Future; /** - * Distributed implementation of {@link java.util.concurrent.locks.Lock} - * Implements reentrant lock.
- * Lock will be removed automatically if client disconnects. + * Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}. + *

+ * Works in non-fair mode. Therefore order of acquiring is unpredictable. * * @author Nikita Koksharov * @@ -93,10 +93,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public boolean tryAcquire(int permits) { + if (permits < 0) { + throw new IllegalArgumentException("Permits amount can't be negative"); + } + return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + - "if (value ~= false and value >= ARGV[1]) then " + - "redis.call('decrby', KEYS[1], ARGV[1]); " + + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + + "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", @@ -162,12 +166,29 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void release(int permits) { + if (permits < 0) { + throw new IllegalArgumentException("Permits amount can't be negative"); + } + commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_OBJECT, "redis.call('incrby', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[2]); ", Arrays.asList(getName(), getChannelName()), permits, unlockMessage); } + @Override + public int drainPermits() { + Long res = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false or value == 0) then " + + "return 0; " + + "end; " + + "redis.call('set', KEYS[1], 0); " + + "return value;", + Collections.singletonList(getName())); + return res.intValue(); + } + @Override public int availablePermits() { Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.GET, getName()); @@ -177,4 +198,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { return res.intValue(); } + @Override + public void setPermits(int permits) { + Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false or value == 0) then " + + "redis.call('set', KEYS[1], ARGV[2]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "end;", + Arrays.asList(getName(), getChannelName()), unlockMessage, permits); + get(f); + } + } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index e8eed682a..d72d1ae8b 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -152,6 +152,7 @@ public interface RedisCommands { RedisStrictCommand EVAL_STRING = new RedisStrictCommand("EVAL", new StringReplayDecoder()); RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL", new IntegerReplayConvertor()); RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); + RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); RedisCommand EVAL_MAP_VALUE = new RedisCommand("EVAL", ValueType.MAP_VALUE); diff --git a/src/main/java/org/redisson/core/RSemaphore.java b/src/main/java/org/redisson/core/RSemaphore.java index ce70f957f..a77a00b4a 100644 --- a/src/main/java/org/redisson/core/RSemaphore.java +++ b/src/main/java/org/redisson/core/RSemaphore.java @@ -18,32 +18,179 @@ package org.redisson.core; import java.util.concurrent.TimeUnit; /** - * Distributed implementation of {@link java.util.concurrent.locks.Lock} - * Implements reentrant lock. - * Use {@link RSemaphore#getHoldCount()} to get a holds count. + * Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}. + *

+ * Works in non-fair mode. Therefore order of acquiring is unpredictable. * * @author Nikita Koksharov * */ - public interface RSemaphore extends RExpirable { + /** + * Acquires a permit from this semaphore, blocking until one is + * available, or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + * @throws InterruptedException if the current thread is interrupted + */ void acquire() throws InterruptedException; + /** + * Acquires the given number of permits from this semaphore, + * blocking until all are available, + * or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires the given number of permits, if they are available, + * and returns immediately, reducing the number of available permits + * by the given amount. + * + *

If insufficient permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + * @param permits the number of permits to acquire + * @throws InterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if {@code permits} is negative + */ void acquire(int permits) throws InterruptedException; + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ boolean tryAcquire(); + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permitss. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ boolean tryAcquire(int permits); + /** + * Acquires a permit from this semaphore, if one becomes available + * within the given waiting time and the current thread has not + * been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + *
  • The specified waiting time elapses. + *
+ * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + * @throws InterruptedException if the current thread is interrupted + */ boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException; boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException; + /** + * Releases a permit, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that was just released. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + */ void release(); + /** + * Releases the given number of permits, returning them to the semaphore. + * + *

Releases the given number of permits, increasing the number of available permits by + * the given number of permits. If any threads of Redisson client are trying to + * acquire a permits, then next threads is selected and tries to acquire the permits that was just released. + * + *

There is no requirement that a thread that releases a permits must + * have acquired that permit by calling {@link #acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + */ void release(int permits); + /** + * Returns the current number of available permits. + * + * @return number of available permits + */ int availablePermits(); + /** + * Acquires and returns all permits that are immediately available. + * + * @return the number of permits acquired + */ + int drainPermits(); + + /** + * Sets new number of permits. + * + * @param count - number of times {@link #countDown} must be invoked + * before threads can pass through {@link #await} + */ + void setPermits(int permits); + } diff --git a/src/test/java/org/redisson/RedissonSemaphoreTest.java b/src/test/java/org/redisson/RedissonSemaphoreTest.java new file mode 100644 index 000000000..7fbffc7c8 --- /dev/null +++ b/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -0,0 +1,260 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.redisson.core.RSemaphore; + +public class RedissonSemaphoreTest extends BaseConcurrentTest { + + @Test + public void testBlockingAcquire() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(1); + s.acquire(); + + Thread t = new Thread() { + @Override + public void run() { + RSemaphore s = redisson.getSemaphore("test"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + s.release(); + } + }; + + t.start(); + + assertThat(s.availablePermits()).isEqualTo(0); + s.acquire(); + assertThat(s.tryAcquire()).isFalse(); + assertThat(s.availablePermits()).isEqualTo(0); + } + + @Test + public void testBlockingNAcquire() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(5); + s.acquire(2); + + Thread t = new Thread() { + @Override + public void run() { + RSemaphore s = redisson.getSemaphore("test"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + s.release(); + } + }; + + assertThat(s.availablePermits()).isEqualTo(3); + t.start(); + + s.acquire(4); + assertThat(s.availablePermits()).isEqualTo(0); + } + + @Test + public void testTryNAcquire() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(5); + assertThat(s.tryAcquire(2)).isTrue(); + + Thread t = new Thread() { + @Override + public void run() { + RSemaphore s = redisson.getSemaphore("test"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + s.release(); + } + }; + + assertThat(s.tryAcquire(4)).isFalse(); + + t.start(); + t.join(1); + + long startTime = System.currentTimeMillis(); + assertThat(s.tryAcquire(4, 1, TimeUnit.SECONDS)).isTrue(); + assertThat(System.currentTimeMillis() - startTime).isBetween(900L, 1020L); + assertThat(s.availablePermits()).isEqualTo(0); + } + + @Test + public void testReleaseWithoutPermits() { + RSemaphore s = redisson.getSemaphore("test"); + s.release(); + + assertThat(s.availablePermits()).isEqualTo(1); + } + + @Test + public void testDrainPermits() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(10); + s.acquire(3); + + assertThat(s.drainPermits()).isEqualTo(7); + assertThat(s.availablePermits()).isEqualTo(0); + } + + @Test + public void testReleaseAcquire() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(10); + s.acquire(); + assertThat(s.availablePermits()).isEqualTo(9); + s.release(); + assertThat(s.availablePermits()).isEqualTo(10); + s.acquire(5); + assertThat(s.availablePermits()).isEqualTo(5); + s.release(5); + assertThat(s.availablePermits()).isEqualTo(10); + } + + + @Test + public void testConcurrency_SingleInstance() throws InterruptedException { + final AtomicInteger lockedCounter = new AtomicInteger(); + + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(1); + + int iterations = 15; + testSingleInstanceConcurrency(iterations, new RedissonRunnable() { + @Override + public void run(RedissonClient redisson) { + RSemaphore s = redisson.getSemaphore("test"); + try { + s.acquire(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + int value = lockedCounter.get(); + lockedCounter.set(value + 1); + s.release(); + } + }); + + assertThat(lockedCounter.get()).isEqualTo(iterations); + } + + @Test + public void testConcurrencyLoop_MultiInstance() throws InterruptedException { + final int iterations = 100; + final AtomicInteger lockedCounter = new AtomicInteger(); + + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(1); + + testMultiInstanceConcurrency(16, new RedissonRunnable() { + @Override + public void run(RedissonClient redisson) { + for (int i = 0; i < iterations; i++) { + try { + redisson.getSemaphore("test").acquire(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + int value = lockedCounter.get(); + lockedCounter.set(value + 1); + redisson.getSemaphore("test").release(); + } + } + }); + + assertThat(lockedCounter.get()).isEqualTo(16 * iterations); + } + + @Test + public void testConcurrency_MultiInstance_1_permits() throws InterruptedException { + int iterations = 100; + final AtomicInteger lockedCounter = new AtomicInteger(); + + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(1); + + testMultiInstanceConcurrency(iterations, new RedissonRunnable() { + @Override + public void run(RedissonClient redisson) { + RSemaphore s = redisson.getSemaphore("test"); + try { + s.acquire(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + int value = lockedCounter.get(); + lockedCounter.set(value + 1); + s.release(); + } + }); + + assertThat(lockedCounter.get()).isEqualTo(iterations); + } + + @Test + public void testConcurrency_MultiInstance_10_permits() throws InterruptedException { + int iterations = 100; + final AtomicInteger lockedCounter = new AtomicInteger(); + + RSemaphore s = redisson.getSemaphore("test"); + s.setPermits(10); + + final CyclicBarrier barrier = new CyclicBarrier(10); + testMultiInstanceConcurrency(iterations, new RedissonRunnable() { + @Override + public void run(RedissonClient redisson) { + RSemaphore s = redisson.getSemaphore("test"); + try { + s.acquire(); + + barrier.await(); + + assertThat(s.availablePermits()).isEqualTo(0); + assertThat(s.tryAcquire()).isFalse(); + + Thread.sleep(50); + + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + int value = lockedCounter.get(); + lockedCounter.set(value + 1); + s.release(); + } + }); + + assertThat(lockedCounter.get()).isLessThan(iterations); + } + +}