Review fixes:

- removed all unnecessary changes
- use table insert

Signed-off-by: Sergey Kuznetsov <sergey.kuznetsov@infobip.com>
pull/5247/head
Sergey Kuznetsov 1 year ago
parent a109fe9cf2
commit e24d413119

@ -16,6 +16,7 @@
package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.codec.ByteArrayCodec;
@ -69,7 +70,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
List<String> ids = acquire(1, leaseTime, timeUnit);
return getFirstOrNull(ids);
}
@Override
public List<String> acquire(int permits, long leaseTime, TimeUnit timeUnit) throws InterruptedException {
List<String> ids = tryAcquire(permits, leaseTime, timeUnit);
@ -103,7 +104,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
// return get(acquireAsync(permits, leaseTime, timeUnit));
}
@Override
public RFuture<String> acquireAsync() {
CompletionStage<String> future = acquireAsync(1)
@ -115,7 +116,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RFuture<List<String>> acquireAsync(int permits) {
return acquireAsync(permits, -1, TimeUnit.MILLISECONDS);
}
@Override
public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
CompletionStage<String> future = acquireAsync(1, leaseTime, timeUnit)
@ -202,15 +203,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
Timeout scheduledFuture;
if (nearestTimeout != null) {
scheduledFuture = getServiceManager().newTimeout(timeout -> {
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
return;
scheduledFuture = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
scheduledFuture = null;
@ -234,16 +238,19 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
entry.addListener(listener);
long t = time.get();
Timeout waitTimeoutFuture = getServiceManager().newTimeout(timeout -> {
if (scheduledFuture != null && !scheduledFuture.cancel()) {
return;
}
Timeout waitTimeoutFuture = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (scheduledFuture != null && !scheduledFuture.cancel()) {
return;
}
if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}
}
}, t, TimeUnit.MILLISECONDS);
waitTimeoutFutureRef.set(waitTimeoutFuture);
@ -367,13 +374,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
private RFuture<List<String>> tryAcquireAsync(List<String> ids, long timeoutDate) {
CompletionStage<List<String>> future = commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_STRING,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[6], KEYS[3], value); " +
"end; " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
@ -427,7 +434,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override
public List<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
@ -472,7 +479,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
time -= System.currentTimeMillis() - current;
if (time <= 0) {
return null;
return Collections.emptyList();
}
// waiting for message
@ -524,7 +531,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
long current = System.currentTimeMillis();
AtomicReference<Timeout> futureRef = new AtomicReference<>();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
@ -543,9 +550,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = getServiceManager().newTimeout(timeout -> {
if (!subscribeFuture.isDone()) {
result.complete(null);
Timeout scheduledFuture = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.complete(null);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
@ -599,21 +609,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expire = redis.call('zscore', KEYS[3], ARGV[1]);" +
"local removed = redis.call('zrem', KEYS[3], ARGV[1]);" +
"if tonumber(removed) ~= 1 then " +
"return 0;" +
"end;" +
"local value = redis.call('incrby', KEYS[1], ARGV[2]); " +
"redis.call(ARGV[4], KEYS[2], value); " +
"if tonumber(expire) <= tonumber(ARGV[3]) then " +
"return 0;" +
"end;" +
"return 1;",
"local expire = redis.call('zscore', KEYS[3], ARGV[1]);" +
"local removed = redis.call('zrem', KEYS[3], ARGV[1]);" +
"if tonumber(removed) ~= 1 then " +
"return 0;" +
"end;" +
"local value = redis.call('incrby', KEYS[1], ARGV[2]); " +
"redis.call(ARGV[4], KEYS[2], value); " +
"if tonumber(expire) <= tonumber(ARGV[3]) then " +
"return 0;" +
"end;" +
"return 1;",
Arrays.asList(getRawName(), channelName, timeoutName),
permitId, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
public RFuture<Integer> tryReleaseAsync(List<String> permitsIds) {
if (permitsIds == null || permitsIds.isEmpty()) {
@ -631,7 +641,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end; " +
"local keys = {}; " +
"for key in string.gmatch(ARGV[1], '%w+') do " +
"keys[#keys + 1] = key; " +
"table.insert(keys, key); " +
"end; " +
"local removed = redis.call('zrem', KEYS[3], unpack(keys)); " +
"if tonumber(removed) == 0 then " +
@ -646,7 +656,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.asList(getRawName(), timeoutName);
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutName);
return super.sizeInMemoryAsync(keys);
}
@ -711,15 +721,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[2], KEYS[3], value); " +
"end;" +
"end;" +
"return value; " +
"end; " +
"local ret = redis.call('get', KEYS[1]); " +
"local ret = redis.call('get', KEYS[1]); " +
"return ret == false and 0 or ret;",
Arrays.asList(getRawName(), timeoutName, channelName),
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -743,7 +753,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return tonumber(available) " +
"end;" +
"return tonumber(available) + acquired;",
Arrays.asList(getRawName(), timeoutName, channelName),
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -760,7 +770,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end; " +
"local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " +
"return acquired == false and 0 or acquired;",
Arrays.asList(getRawName(), timeoutName, channelName),
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -790,7 +800,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end;" +
"redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]);",
Arrays.asList(getRawName(), channelName, timeoutName),
Arrays.<Object>asList(getRawName(), channelName, timeoutName),
permits, getSubscribeService().getPublishCommand());
}
@ -798,13 +808,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then " +
"redis.call('set', KEYS[1], ARGV[1]); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]); " +
"return 1;" +
"end;" +
"return 0;",
Arrays.asList(getRawName(), channelName),
"if (value == false) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getRawName(), channelName),
permits, getSubscribeService().getPublishCommand());
}
@ -817,13 +827,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RFuture<Void> addPermitsAsync(int permits) {
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then " +
"value = 0;" +
"end;" +
"redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); " +
"if tonumber(ARGV[1]) > 0 then " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]); " +
"end;",
"if (value == false) then "
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), channelName), permits, getSubscribeService().getPublishCommand());
}
@ -834,18 +844,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[4], KEYS[3], value); " +
"end;" +
"end;" +
"end; " +
"local value = redis.call('zscore', KEYS[2], ARGV[1]); " +
"if (value ~= false) then " +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[1]); " +
"return 1;" +
"end;" +
"return 0;",
"local value = redis.call('zscore', KEYS[2], ARGV[1]); " +
"if (value ~= false) then "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.asList(getRawName(), timeoutName, channelName),
permitId, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}

@ -103,7 +103,6 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
* if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
*/
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
/**

@ -175,7 +175,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
};
t.start();
@ -206,7 +206,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
};
t.start();
@ -414,10 +414,10 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
RPermitExpirableSemaphore s1 = redisson.getPermitExpirableSemaphore("test");
try {
String permitId = s1.acquire();
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release(permitId);
} catch (InterruptedException e) {
int value = lockedCounter.get();
lockedCounter.set(value + 1);
s1.release(permitId);
}catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
@ -443,9 +443,10 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.getAndIncrement();
int value = lockedCounter.get();
lockedCounter.set(value + 1);
r.getPermitExpirableSemaphore("test").release(permitId);
} catch (InterruptedException e1) {
}catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

Loading…
Cancel
Save