diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 893a19d15..4f53e9b55 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -16,6 +16,7 @@ package org.redisson; import io.netty.util.Timeout; +import io.netty.util.TimerTask; import org.redisson.api.RFuture; import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.client.codec.ByteArrayCodec; @@ -69,7 +70,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen List ids = acquire(1, leaseTime, timeUnit); return getFirstOrNull(ids); } - + @Override public List acquire(int permits, long leaseTime, TimeUnit timeUnit) throws InterruptedException { List ids = tryAcquire(permits, leaseTime, timeUnit); @@ -103,7 +104,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } // return get(acquireAsync(permits, leaseTime, timeUnit)); } - + @Override public RFuture acquireAsync() { CompletionStage future = acquireAsync(1) @@ -115,7 +116,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public RFuture> acquireAsync(int permits) { return acquireAsync(permits, -1, TimeUnit.MILLISECONDS); } - + @Override public RFuture acquireAsync(long leaseTime, TimeUnit timeUnit) { CompletionStage future = acquireAsync(1, leaseTime, timeUnit) @@ -202,15 +203,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen Timeout scheduledFuture; if (nearestTimeout != null) { - scheduledFuture = getServiceManager().newTimeout(timeout -> { - if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) { - return; + scheduledFuture = getServiceManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) { + return; + } + + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit); } - - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit); }, nearestTimeout, TimeUnit.MILLISECONDS); } else { scheduledFuture = null; @@ -234,16 +238,19 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen entry.addListener(listener); long t = time.get(); - Timeout waitTimeoutFuture = getServiceManager().newTimeout(timeout -> { - if (scheduledFuture != null && !scheduledFuture.cancel()) { - return; - } + Timeout waitTimeoutFuture = getServiceManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (scheduledFuture != null && !scheduledFuture.cancel()) { + return; + } - if (entry.removeListener(listener)) { - long elapsed = System.currentTimeMillis() - current; - time.addAndGet(-elapsed); - - tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit); + if (entry.removeListener(listener)) { + long elapsed = System.currentTimeMillis() - current; + time.addAndGet(-elapsed); + + tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit); + } } }, t, TimeUnit.MILLISECONDS); waitTimeoutFutureRef.set(waitTimeoutFuture); @@ -367,13 +374,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen private RFuture> tryAcquireAsync(List ids, long timeoutDate) { CompletionStage> future = commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_STRING, - "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " + + "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " + "if #expiredIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + "redis.call(ARGV[6], KEYS[3], value); " + - "end; " + + "end;" + "end; " + "local value = redis.call('get', KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + @@ -427,7 +434,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen .thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull); return new CompletableFutureWrapper<>(future); } - + @Override public List tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); @@ -472,7 +479,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen time -= System.currentTimeMillis() - current; if (time <= 0) { - return null; + return Collections.emptyList(); } // waiting for message @@ -524,7 +531,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } long current = System.currentTimeMillis(); - AtomicReference futureRef = new AtomicReference<>(); + AtomicReference futureRef = new AtomicReference(); CompletableFuture subscribeFuture = subscribe(); subscribeFuture.whenComplete((r, ex) -> { if (ex != null) { @@ -543,9 +550,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen }); if (!subscribeFuture.isDone()) { - Timeout scheduledFuture = getServiceManager().newTimeout(timeout -> { - if (!subscribeFuture.isDone()) { - result.complete(null); + Timeout scheduledFuture = getServiceManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (!subscribeFuture.isDone()) { + result.complete(null); + } } }, time.get(), TimeUnit.MILLISECONDS); futureRef.set(scheduledFuture); @@ -599,21 +609,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local expire = redis.call('zscore', KEYS[3], ARGV[1]);" + - "local removed = redis.call('zrem', KEYS[3], ARGV[1]);" + - "if tonumber(removed) ~= 1 then " + - "return 0;" + - "end;" + - "local value = redis.call('incrby', KEYS[1], ARGV[2]); " + - "redis.call(ARGV[4], KEYS[2], value); " + - "if tonumber(expire) <= tonumber(ARGV[3]) then " + - "return 0;" + - "end;" + - "return 1;", + "local expire = redis.call('zscore', KEYS[3], ARGV[1]);" + + "local removed = redis.call('zrem', KEYS[3], ARGV[1]);" + + "if tonumber(removed) ~= 1 then " + + "return 0;" + + "end;" + + "local value = redis.call('incrby', KEYS[1], ARGV[2]); " + + "redis.call(ARGV[4], KEYS[2], value); " + + "if tonumber(expire) <= tonumber(ARGV[3]) then " + + "return 0;" + + "end;" + + "return 1;", Arrays.asList(getRawName(), channelName, timeoutName), permitId, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } - + @Override public RFuture tryReleaseAsync(List permitsIds) { if (permitsIds == null || permitsIds.isEmpty()) { @@ -631,7 +641,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "end; " + "local keys = {}; " + "for key in string.gmatch(ARGV[1], '%w+') do " + - "keys[#keys + 1] = key; " + + "table.insert(keys, key); " + "end; " + "local removed = redis.call('zrem', KEYS[3], unpack(keys)); " + "if tonumber(removed) == 0 then " + @@ -646,7 +656,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen @Override public RFuture sizeInMemoryAsync() { - List keys = Arrays.asList(getRawName(), timeoutName); + List keys = Arrays.asList(getRawName(), timeoutName); return super.sizeInMemoryAsync(keys); } @@ -711,15 +721,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " + "if #expiredIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + - "local value = redis.call('incrby', KEYS[1], #expiredIds); " + + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + "redis.call(ARGV[2], KEYS[3], value); " + - "end;" + + "end;" + "return value; " + "end; " + - "local ret = redis.call('get', KEYS[1]); " + + "local ret = redis.call('get', KEYS[1]); " + "return ret == false and 0 or ret;", - Arrays.asList(getRawName(), timeoutName, channelName), + Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @@ -743,7 +753,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "return tonumber(available) " + "end;" + "return tonumber(available) + acquired;", - Arrays.asList(getRawName(), timeoutName, channelName), + Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @@ -760,7 +770,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "end; " + "local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " + "return acquired == false and 0 or acquired;", - Arrays.asList(getRawName(), timeoutName, channelName), + Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @@ -790,7 +800,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "end;" + "redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " + "redis.call(ARGV[2], KEYS[2], ARGV[1]);", - Arrays.asList(getRawName(), channelName, timeoutName), + Arrays.asList(getRawName(), channelName, timeoutName), permits, getSubscribeService().getPublishCommand()); } @@ -798,13 +808,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public RFuture trySetPermitsAsync(int permits) { return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + - "if (value == false) then " + - "redis.call('set', KEYS[1], ARGV[1]); " + - "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + - "return 1;" + - "end;" + - "return 0;", - Arrays.asList(getRawName(), channelName), + "if (value == false) then " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(getRawName(), channelName), permits, getSubscribeService().getPublishCommand()); } @@ -817,13 +827,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public RFuture addPermitsAsync(int permits) { return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('get', KEYS[1]); " + - "if (value == false) then " + - "value = 0;" + - "end;" + - "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); " + - "if tonumber(ARGV[1]) > 0 then " + - "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + - "end;", + "if (value == false) then " + + "value = 0;" + + "end;" + + "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); " + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + + "end;", Arrays.asList(getRawName(), channelName), permits, getSubscribeService().getPublishCommand()); } @@ -834,18 +844,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, -1); " + "if #expiredIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + - "local value = redis.call('incrby', KEYS[1], #expiredIds); " + + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + "redis.call(ARGV[4], KEYS[3], value); " + - "end;" + + "end;" + "end; " + - "local value = redis.call('zscore', KEYS[2], ARGV[1]); " + - "if (value ~= false) then " + - "redis.call('zadd', KEYS[2], ARGV[2], ARGV[1]); " + - "return 1;" + - "end;" + - "return 0;", + "local value = redis.call('zscore', KEYS[2], ARGV[1]); " + + "if (value ~= false) then " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", Arrays.asList(getRawName(), timeoutName, channelName), permitId, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java index ff6df285c..17654cec2 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java @@ -103,7 +103,6 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS * if the waiting time elapsed before a permit was acquired * @throws InterruptedException if the current thread is interrupted */ - String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException; /** diff --git a/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java index 239086a0a..93b06bae5 100644 --- a/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonPermitExpirableSemaphoreTest.java @@ -175,7 +175,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { // TODO Auto-generated catch block e.printStackTrace(); } - } + }; }; t.start(); @@ -206,7 +206,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { // TODO Auto-generated catch block e.printStackTrace(); } - } + }; }; t.start(); @@ -414,10 +414,10 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { RPermitExpirableSemaphore s1 = redisson.getPermitExpirableSemaphore("test"); try { String permitId = s1.acquire(); - int value = lockedCounter.get(); - lockedCounter.set(value + 1); - s1.release(permitId); - } catch (InterruptedException e) { + int value = lockedCounter.get(); + lockedCounter.set(value + 1); + s1.release(permitId); + }catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } @@ -443,9 +443,10 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest { } catch (InterruptedException e) { e.printStackTrace(); } - lockedCounter.getAndIncrement(); + int value = lockedCounter.get(); + lockedCounter.set(value + 1); r.getPermitExpirableSemaphore("test").release(permitId); - } catch (InterruptedException e1) { + }catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }