PermitExpirableSemaphore implemented. #604

pull/610/head
Nikita 9 years ago
parent 82dbf40c99
commit 07f0228de5

@ -55,6 +55,7 @@ import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScript;
import org.redisson.api.RSemaphore;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
@ -467,6 +468,11 @@ public class Redisson implements RedissonClient {
public RSemaphore getSemaphore(String name) {
return new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
}
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub);
}
@Override
public <V> RBloomFilter<V> getBloomFilter(String name) {

@ -0,0 +1,674 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonPermitExpirableSemaphore extends RedissonExpirable implements RPermitExpirableSemaphore {
private final SemaphorePubSub semaphorePubSub;
final CommandExecutor commandExecutor;
private final String timeoutName;
private final long nonExpirableTimeout = 922337203685477L;
protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
super(commandExecutor, name);
this.timeoutName = "{" + name + "}:timeout";
this.commandExecutor = commandExecutor;
this.semaphorePubSub = semaphorePubSub;
}
String getChannelName() {
return getChannelName(getName());
}
public static String getChannelName(String name) {
if (name.contains("{")) {
return "redisson_sc:" + name;
}
return "redisson_sc:{" + name + "}";
}
@Override
public String acquire() throws InterruptedException {
return acquire(1, -1, TimeUnit.MILLISECONDS);
}
@Override
public String acquire(long leaseTime, TimeUnit timeUnit) throws InterruptedException {
return acquire(1, leaseTime, timeUnit);
}
@Override
public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
return acquireAsync(1, leaseTime, timeUnit);
}
private String acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
String permitId = tryAcquire(permits, ttl, timeUnit);
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
}
RFuture<RedissonLockEntry> future = subscribe();
get(future);
try {
while (true) {
final Long nearestTimeout;
permitId = tryAcquire(permits, ttl, timeUnit);
if (permitId != null) {
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
}
if (nearestTimeout != null) {
getEntry().getLatch().tryAcquire(permits, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
getEntry().getLatch().acquire(permits);
}
}
} finally {
unsubscribe(future);
}
// return get(acquireAsync(permits, ttl, timeUnit));
}
public RFuture<String> acquireAsync() {
return acquireAsync(1, -1, TimeUnit.MILLISECONDS);
}
private RFuture<String> acquireAsync(final int permits, final long ttl, final TimeUnit timeUnit) {
final RPromise<String> result = newPromise();
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
String permitId = future.getNow();
if (permitId != null && !permitId.startsWith(":")) {
if (!result.trySuccess(permitId)) {
releaseAsync(permitId);
}
return;
}
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
}
});
}
});
return result;
}
private void tryAcquireAsync(final AtomicLong time, final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<String> result, final long ttl, final TimeUnit timeUnit) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
result.tryFailure(future.cause());
return;
}
final Long nearestTimeout;
String permitId = future.getNow();
if (permitId != null) {
if (!permitId.startsWith(":")) {
unsubscribe(subscribeFuture);
if (!result.trySuccess(permitId)) {
releaseAsync(permitId);
}
return;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
}
if (time.get() < 0) {
unsubscribe(subscribeFuture);
result.trySuccess(null);
return;
}
// waiting for message
final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
} else {
final AtomicReference<Timeout> waitTimeoutFutureRef = new AtomicReference<Timeout>();
final Timeout scheduledFuture;
if (nearestTimeout != null) {
scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
}
}, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
scheduledFuture = null;
}
final Runnable listener = new Runnable() {
@Override
public void run() {
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
entry.getLatch().release();
return;
}
if (scheduledFuture != null && !scheduledFuture.cancel()) {
entry.getLatch().release();
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
}
};
entry.addListener(listener);
long t = time.get();
Timeout waitTimeoutFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (scheduledFuture != null && !scheduledFuture.cancel()) {
return;
}
synchronized (entry) {
if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
}
}
}
}, t, TimeUnit.MILLISECONDS);
waitTimeoutFutureRef.set(waitTimeoutFuture);
}
}
}
});
}
private void acquireAsync(final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<String> result, final long ttl, final TimeUnit timeUnit) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
result.setFailure(future.cause());
return;
}
final Long nearestTimeout;
String permitId = future.getNow();
if (permitId != null) {
if (!permitId.startsWith(":")) {
unsubscribe(subscribeFuture);
if (!result.trySuccess(permitId)) {
releaseAsync(permitId);
}
return;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
}
final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire(permits)) {
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
} else {
final Timeout scheduledFuture;
if (nearestTimeout != null) {
scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
}
}, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
scheduledFuture = null;
}
Runnable listener = new Runnable() {
@Override
public void run() {
if (scheduledFuture != null && !scheduledFuture.cancel()) {
entry.getLatch().release();
return;
}
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
}
};
entry.addListener(listener);
}
}
}
});
}
@Override
public String tryAcquire() {
String res = tryAcquire(1, -1, TimeUnit.MILLISECONDS);
if (res != null && res.startsWith(":")) {
return null;
}
return res;
}
private String tryAcquire(int permits, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
return get(tryAcquireAsync(permits, timeoutDate));
}
private long calcTimeout(long ttl, TimeUnit timeUnit) {
if (ttl != -1) {
return System.currentTimeMillis() + timeUnit.toMillis(ttl);
}
return nonExpirableTimeout;
}
public RFuture<String> tryAcquireAsync() {
final RPromise<String> result = newPromise();
RFuture<String> res = tryAcquireAsync(1, nonExpirableTimeout);
res.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
String permitId = future.getNow();
if (permitId != null && !permitId.startsWith(":")) {
if (!result.trySuccess(permitId)) {
releaseAsync(permitId);
}
} else {
result.trySuccess(null);
}
}
});
return result;
}
protected String generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
public RFuture<String> tryAcquireAsync(int permits, long timeoutDate) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
String id = generateId();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_STRING_DATA,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[3]); " +
"return ARGV[3]; " +
"end; " +
"local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " +
"if v[1] ~= nil and v[2] ~= ARGV[5] then " +
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.<Object>asList(getName(), timeoutName, getChannelName()), permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout);
}
public RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, -1, unit);
}
@Override
public String tryAcquire(long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
return tryAcquire(1, waitTime, ttl, unit);
}
@Override
public RFuture<String> tryAcquireAsync(long waitTime, long ttl, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, ttl, unit);
}
private String tryAcquire(int permits, long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
String permitId = tryAcquire(permits, ttl, unit);
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
}
long time = unit.toMillis(waitTime);
RFuture<RedissonLockEntry> future = subscribe();
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return null;
}
try {
while (true) {
final Long nearestTimeout;
permitId = tryAcquire(permits, ttl, unit);
if (permitId != null) {
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
}
if (time <= 0) {
return null;
}
// waiting for message
long current = System.currentTimeMillis();
if (nearestTimeout != null) {
getEntry().getLatch().tryAcquire(permits, Math.min(time, nearestTimeout), TimeUnit.MILLISECONDS);
} else {
getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
}
long elapsed = System.currentTimeMillis() - current;
time -= elapsed;
}
} finally {
unsubscribe(future);
}
// return get(tryAcquireAsync(permits, waitTime, ttl, unit));
}
private RFuture<String> tryAcquireAsync(final int permits, long waitTime, final long ttl, final TimeUnit timeUnit) {
final RPromise<String> result = newPromise();
final AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
String permitId = future.getNow();
if (permitId != null && !permitId.startsWith(":")) {
if (!result.trySuccess(permitId)) {
releaseAsync(permitId);
}
return;
}
final long current = System.currentTimeMillis();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final RFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
if (time.get() < 0) {
unsubscribe(subscribeFuture);
result.trySuccess(null);
return;
}
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
}
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.trySuccess(null);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}
});
return result;
}
private RedissonLockEntry getEntry() {
return semaphorePubSub.getEntry(getName());
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
}
private void unsubscribe(RFuture<RedissonLockEntry> future) {
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
}
@Override
public String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
String res = tryAcquire(1, waitTime, -1, unit);
if (res != null && res.startsWith(":")) {
return null;
}
return res;
}
@Override
public void release(String permitId) {
get(releaseAsync(permitId));
}
@Override
public boolean tryRelease(String permitId) {
return get(tryReleaseAsync(permitId));
}
@Override
public RFuture<Boolean> tryReleaseAsync(String permitId) {
if (permitId == null) {
throw new IllegalArgumentException("permitId can't be null");
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local removed = redis.call('zrem', KEYS[3], ARGV[1]);" +
"if tonumber(removed) ~= 1 then " +
"return 0;" +
"end;" +
"local value = redis.call('incrby', KEYS[1], ARGV[2]); " +
"redis.call('publish', KEYS[2], value); " +
"return 1;",
Arrays.<Object>asList(getName(), getChannelName(), timeoutName), permitId, 1);
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), timeoutName);
}
@Override
public RFuture<Void> releaseAsync(final String permitId) {
final RPromise<Void> result = newPromise();
tryReleaseAsync(permitId).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
result.trySuccess(null);
} else {
result.tryFailure(new IllegalArgumentException("Permit with id " + permitId + " has already been released or doesn't exist"));
}
}
});
return result;
}
@Override
public int availablePermits() {
return get(availablePermitsAsync());
}
@Override
public RFuture<Integer> availablePermitsAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName());
}
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
@Override
public void addPermits(int permits) {
get(addPermitsAsync(permits));
}
@Override
public RFuture<Void> addPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
}

@ -0,0 +1,226 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Semaphore object with lease time parameter support for each acquired permit.
*
* <p>Each permit identified by own id and could be released only using its id.
* Permit id is a 128-bits unique random identifier generated each time during acquiring.
*
* <p>Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
*/
public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableSemaphoreAsync {
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @return permit id
* @throws InterruptedException if the current thread is interrupted
*/
String acquire() throws InterruptedException;
/**
* Acquires a permit with defined lease time from this semaphore,
* blocking until one is available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param leaseTime - permit lease time
* @param unit
* @return
* @throws InterruptedException if the current thread is interrupted
*/
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code null}.
*
* @return permit id if a permit was acquired and {@code null}
* otherwise
*/
String tryAcquire();
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
*/
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires a permit with defined lease time from this semaphore,
* if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param leaseTime permit lease time
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
*/
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitId
* @return {@code true} if a permit has been released and {@code false}
* otherwise
*/
boolean tryRelease(String permitId);
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* <p>Throws an exception if permit id doesn't exist or has already been release
*
* @param permitId
*/
void release(String permitId);
/**
* Returns the current number of available permits.
*
* @return number of available permits
*/
int availablePermits();
/**
* Sets new number of permits.
*
* @param count - number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @result <code>true</code> if semaphore has not initialized yet, otherwise <code>false</code>.
*
*/
boolean trySetPermits(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*
* @param number of permits to add/remove
*/
void addPermits(int permits);
}

@ -0,0 +1,222 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Semaphore object with support of lease time parameter for each acquired permit.
*
* <p>Each permit identified by own id and could be released only using its id.
* Permit id is a 128-bits unique random identifier generated each time during acquiring.
*
* <p>Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
*/
public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @return permit id
*/
RFuture<String> acquireAsync();
/**
* Acquires a permit with defined lease time from this semaphore,
* blocking until one is available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param leaseTime - permit lease time
* @param unit
* @return
*/
RFuture<String> acquireAsync(long leaseTime, TimeUnit unit);
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code null}.
*
* @return permit id if a permit was acquired and {@code null}
* otherwise
*/
RFuture<String> tryAcquireAsync();
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
*/
RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit);
/**
* Acquires a permit with defined lease time from this semaphore,
* if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param leaseTime permit lease time
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
*/
RFuture<String> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit);
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitId
* @return {@code true} if a permit has been released and {@code false}
* otherwise
*/
RFuture<Boolean> tryReleaseAsync(String permitId);
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* <p>Throws an exception if permit id doesn't exist or has already been release
*
* @param permitId
*/
RFuture<Void> releaseAsync(String permitId);
/**
* Returns the current number of available permits.
*
* @return number of available permits
*/
RFuture<Integer> availablePermitsAsync();
/**
* Sets new number of permits.
*
* @param count - number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @result <code>true</code> if semaphore has not initialized yet, otherwise <code>false</code>.
*
*/
RFuture<Boolean> trySetPermitsAsync(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*
* @param number of permits to add/remove
*/
RFuture<Void> addPermitsAsync(int permits);
}

@ -294,6 +294,15 @@ public interface RedissonClient {
* @return
*/
RSemaphore getSemaphore(String name);
/**
* Returns semaphore instance by name.
* Supports lease time parameter for each acquired permit.
*
* @param name
* @return
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(String name);
/**
* Returns lock instance by name.

@ -192,6 +192,7 @@ public interface RedisCommands {
RedisCommand<Boolean> EVAL_BOOLEAN_WITH_VALUES = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.OBJECTS);
RedisCommand<Boolean> EVAL_BOOLEAN_WITH_VALUES_6 = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.OBJECTS);
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());
RedisStrictCommand<String> EVAL_STRING_DATA = new RedisStrictCommand<String>("EVAL", new StringDataDecoder());
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
@ -243,6 +244,7 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET", 2);
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor(), 2);
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor(), 2);

@ -0,0 +1,221 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.RedisException;
public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
@Test
public void testExpire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
String permitId = s.acquire(2, TimeUnit.SECONDS);
final long startTime = System.currentTimeMillis();
AtomicBoolean bool = new AtomicBoolean();
Thread t = new Thread() {
public void run() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
try {
String permitId = s.acquire();
long spendTime = System.currentTimeMillis() - startTime;
assertThat(spendTime).isBetween(1900L, 2100L);
s.release(permitId);
bool.set(true);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
};
t.start();
t.join();
assertThat(s.tryRelease(permitId)).isFalse();
assertThat(bool.get()).isTrue();
}
@Test
public void testExpireTryAcquire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
String permitId = s.tryAcquire(100, 2, TimeUnit.SECONDS);
final long startTime = System.currentTimeMillis();
AtomicBoolean bool = new AtomicBoolean();
Thread t = new Thread() {
public void run() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
try {
String permitId = s.acquire();
long spendTime = System.currentTimeMillis() - startTime;
assertThat(spendTime).isBetween(1900L, 2100L);
s.release(permitId);
bool.set(true);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
};
t.start();
t.join();
assertThat(s.tryRelease(permitId)).isFalse();
assertThat(bool.get()).isTrue();
}
@Test
public void testTrySetPermits() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
assertThat(s.trySetPermits(10)).isTrue();
assertThat(s.availablePermits()).isEqualTo(10);
assertThat(s.trySetPermits(15)).isFalse();
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testAddPermits() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
s.addPermits(5);
assertThat(s.availablePermits()).isEqualTo(15);
s.addPermits(-10);
assertThat(s.availablePermits()).isEqualTo(5);
}
@Test
public void testBlockingAcquire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
String permitId = s.acquire();
assertThat(permitId).hasSize(32);
Thread t = new Thread() {
@Override
public void run() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
s.release(permitId);
}
};
t.start();
assertThat(s.availablePermits()).isEqualTo(0);
s.acquire();
assertThat(s.tryAcquire()).isNull();
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testTryAcquire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
String permitId = s.tryAcquire();
assertThat(permitId).hasSize(32);
Thread t = new Thread() {
@Override
public void run() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
s.release(permitId);
}
};
assertThat(s.tryAcquire()).isNull();
t.start();
t.join(1);
long startTime = System.currentTimeMillis();
String permitId2 = s.tryAcquire(1, TimeUnit.SECONDS);
assertThat(permitId2).hasSize(32);
assertThat(System.currentTimeMillis() - startTime).isBetween(500L, 600L);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test(expected = RedisException.class)
public void testReleaseWithoutPermits() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.release("1234");
}
@Test
public void testConcurrency_SingleInstance() throws InterruptedException {
final AtomicInteger lockedCounter = new AtomicInteger();
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
int iterations = 100;
testSingleInstanceConcurrency(iterations, r -> {
RPermitExpirableSemaphore s1 = redisson.getPermitExpirableSemaphore("test");
try {
String permitId = s1.acquire();
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release(permitId);
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
assertThat(lockedCounter.get()).isEqualTo(iterations);
}
@Test
public void testConcurrencyLoop_MultiInstance() throws InterruptedException {
final int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(1);
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
try {
String permitId = r.getPermitExpirableSemaphore("test").acquire();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = lockedCounter.get();
lockedCounter.set(value + 1);
r.getPermitExpirableSemaphore("test").release(permitId);
}catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
});
assertThat(lockedCounter.get()).isEqualTo(16 * iterations);
}
}
Loading…
Cancel
Save