RSemaphore improvements and tests. #207

pull/365/head
Nikita 9 years ago
parent aef3214d23
commit c3dd888b96

@ -224,6 +224,12 @@ public interface RedissonClient {
*/
<K, V> RMap<K, V> getMap(String name, Codec codec);
/**
* Returns semaphore instance by name
*
* @param name of semaphore
* @return
*/
RSemaphore getSemaphore(String name);
/**

@ -30,9 +30,9 @@ import org.redisson.pubsub.LockPubSub;
import io.netty.util.concurrent.Future;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.<br>
* Lock will be removed automatically if client disconnects.
* Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}.
* <p/>
* Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
@ -93,10 +93,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public boolean tryAcquire(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and value >= ARGV[1]) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
@ -162,12 +166,29 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void release(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_OBJECT,
"redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], ARGV[2]); ",
Arrays.<Object>asList(getName(), getChannelName()), permits, unlockMessage);
}
@Override
public int drainPermits() {
Long res = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then " +
"return 0; " +
"end; " +
"redis.call('set', KEYS[1], 0); " +
"return value;",
Collections.<Object>singletonList(getName()));
return res.intValue();
}
@Override
public int availablePermits() {
Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.GET, getName());
@ -177,4 +198,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return res.intValue();
}
@Override
public void setPermits(int permits) {
Future<Void> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getName(), getChannelName()), unlockMessage, permits);
get(f);
}
}

@ -152,6 +152,7 @@ public interface RedisCommands {
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", ValueType.MAP_VALUE);

@ -18,32 +18,179 @@ package org.redisson.core;
import java.util.concurrent.TimeUnit;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.
* Use {@link RSemaphore#getHoldCount()} to get a holds count.
* Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}.
* <p/>
* Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
*/
public interface RSemaphore extends RExpirable {
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @throws InterruptedException if the current thread is interrupted
*/
void acquire() throws InterruptedException;
/**
* Acquires the given number of permits from this semaphore,
* blocking until all are available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires the given number of permits, if they are available,
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* <p>If insufficient permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param permits the number of permits to acquire
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
void acquire(int permits) throws InterruptedException;
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
boolean tryAcquire();
/**
* Acquires the given number of permits only if all are available at the
* time of invocation.
*
* <p>Acquires a permits, if all are available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by given number of permitss.
*
* <p>If no permits are available then this method will return
* immediately with the value {@code false}.
*
* @param permits the number of permits to acquire
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
boolean tryAcquire(int permits);
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
* if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
*/
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException;
/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
void release();
/**
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of available permits by
* the given number of permits. If any threads of Redisson client are trying to
* acquire a permits, then next threads is selected and tries to acquire the permits that was just released.
*
* <p>There is no requirement that a thread that releases a permits must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
void release(int permits);
/**
* Returns the current number of available permits.
*
* @return number of available permits
*/
int availablePermits();
/**
* Acquires and returns all permits that are immediately available.
*
* @return the number of permits acquired
*/
int drainPermits();
/**
* Sets new number of permits.
*
* @param count - number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
*/
void setPermits(int permits);
}

@ -0,0 +1,260 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.redisson.core.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testBlockingAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.acquire();
Thread t = new Thread() {
@Override
public void run() {
RSemaphore s = redisson.getSemaphore("test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
s.release();
}
};
t.start();
assertThat(s.availablePermits()).isEqualTo(0);
s.acquire();
assertThat(s.tryAcquire()).isFalse();
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testBlockingNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
s.acquire(2);
Thread t = new Thread() {
@Override
public void run() {
RSemaphore s = redisson.getSemaphore("test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
s.release();
}
};
assertThat(s.availablePermits()).isEqualTo(3);
t.start();
s.acquire(4);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testTryNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
assertThat(s.tryAcquire(2)).isTrue();
Thread t = new Thread() {
@Override
public void run() {
RSemaphore s = redisson.getSemaphore("test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
s.release();
}
};
assertThat(s.tryAcquire(4)).isFalse();
t.start();
t.join(1);
long startTime = System.currentTimeMillis();
assertThat(s.tryAcquire(4, 1, TimeUnit.SECONDS)).isTrue();
assertThat(System.currentTimeMillis() - startTime).isBetween(900L, 1020L);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testReleaseWithoutPermits() {
RSemaphore s = redisson.getSemaphore("test");
s.release();
assertThat(s.availablePermits()).isEqualTo(1);
}
@Test
public void testDrainPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.acquire(3);
assertThat(s.drainPermits()).isEqualTo(7);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testReleaseAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.acquire();
assertThat(s.availablePermits()).isEqualTo(9);
s.release();
assertThat(s.availablePermits()).isEqualTo(10);
s.acquire(5);
assertThat(s.availablePermits()).isEqualTo(5);
s.release(5);
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testConcurrency_SingleInstance() throws InterruptedException {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
int iterations = 15;
testSingleInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
}
});
assertThat(lockedCounter.get()).isEqualTo(iterations);
}
@Test
public void testConcurrencyLoop_MultiInstance() throws InterruptedException {
final int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
testMultiInstanceConcurrency(16, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
for (int i = 0; i < iterations; i++) {
try {
redisson.getSemaphore("test").acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
redisson.getSemaphore("test").release();
}
}
});
assertThat(lockedCounter.get()).isEqualTo(16 * iterations);
}
@Test
public void testConcurrency_MultiInstance_1_permits() throws InterruptedException {
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
}
});
assertThat(lockedCounter.get()).isEqualTo(iterations);
}
@Test
public void testConcurrency_MultiInstance_10_permits() throws InterruptedException {
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
final CyclicBarrier barrier = new CyclicBarrier(10);
testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override
public void run(RedissonClient redisson) {
RSemaphore s = redisson.getSemaphore("test");
try {
s.acquire();
barrier.await();
assertThat(s.availablePermits()).isEqualTo(0);
assertThat(s.tryAcquire()).isFalse();
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s.release();
}
});
assertThat(lockedCounter.get()).isLessThan(iterations);
}
}
Loading…
Cancel
Save