Merge branch 'master' of github.com:redisson/redisson

pull/5108/head
Nikita Koksharov 2 years ago
commit f770c86fe0

@ -67,14 +67,14 @@ Amount of Redisson instances created by Tomcat for multiple contexts could be re
### 2. Copy two jars into `TOMCAT_BASE/lib` directory:
[redisson-all-3.22.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.22.0&e=jar)
[redisson-all-3.22.0.jar](https://repo1.maven.org/maven2/org/redisson/redisson-all/3.22.0/redisson-all-3.22.0.jar)
Tomcat 7.x - [redisson-tomcat-7-3.22.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.22.0&e=jar)
Tomcat 7.x - [redisson-tomcat-7-3.22.0.jar](https://repo1.maven.org/maven2/org/redisson/redisson-tomcat-7/3.22.0/redisson-tomcat-7-3.22.0.jar)
Tomcat 8.x - [redisson-tomcat-8-3.22.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.22.0&e=jar)
Tomcat 8.x - [redisson-tomcat-8-3.22.0.jar](https://repo1.maven.org/maven2/org/redisson/redisson-tomcat-8/3.22.0/redisson-tomcat-8-3.22.0.jar)
Tomcat 9.x - [redisson-tomcat-9-3.22.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-9&v=3.22.0&e=jar)
Tomcat 9.x - [redisson-tomcat-9-3.22.0.jar](https://repo1.maven.org/maven2/org/redisson/redisson-tomcat-9/3.22.0/redisson-tomcat-9-3.22.0.jar)
Tomcat 10.x - [redisson-tomcat-10-3.22.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-10&v=3.22.0&e=jar)
Tomcat 10.x - [redisson-tomcat-10-3.22.0.jar](https://repo1.maven.org/maven2/org/redisson/redisson-tomcat-10/3.22.0/redisson-tomcat-10-3.22.0.jar)
Try __[Redisson PRO](https://redisson.pro)__ with **ultra-fast performance** and **support by SLA**.

@ -172,7 +172,7 @@ public class RedissonLock extends RedissonBaseLock {
return new CompletableFutureWrapper<>(f);
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
@ -145,31 +144,28 @@ public class RedissonSpinLock extends RedissonBaseLock {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
final long time = unit.toMillis(waitTime);
final long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
if (System.currentTimeMillis() - current >= time) {
acquireFailed(waitTime, unit, threadId);
return false;
}
LockOptions.BackOffPolicy backOffPolicy = backOff.create();
while (true) {
current = System.currentTimeMillis();
Thread.sleep(backOffPolicy.getNextSleepPeriod());
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
if (System.currentTimeMillis() - current >= time) {
acquireFailed(waitTime, unit, threadId);
return false;
}
@ -262,16 +258,14 @@ public class RedissonSpinLock extends RedissonBaseLock {
long currentThreadId) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
LockOptions.BackOffPolicy backOffPolicy = backOff.create();
tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy);
tryLock(System.currentTimeMillis(), leaseTime, unit, currentThreadId, result, unit.toMillis(waitTime), backOffPolicy);
return new CompletableFutureWrapper<>(result);
}
private void tryLock(long leaseTime, TimeUnit unit, long currentThreadId, CompletableFuture<Boolean> result,
AtomicLong time, LockOptions.BackOffPolicy backOffPolicy) {
long startTime = System.currentTimeMillis();
private void tryLock(long startTime, long leaseTime, TimeUnit unit, long currentThreadId, CompletableFuture<Boolean> result,
long waitTime, LockOptions.BackOffPolicy backOffPolicy) {
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> {
if (e != null) {
@ -287,17 +281,14 @@ public class RedissonSpinLock extends RedissonBaseLock {
return;
}
long el = System.currentTimeMillis() - startTime;
time.addAndGet(-el);
if (time.get() <= 0) {
if (System.currentTimeMillis() - startTime >= waitTime) {
trySuccessFalse(currentThreadId, result);
return;
}
long nextSleepPeriod = backOffPolicy.getNextSleepPeriod();
getServiceManager().newTimeout(
timeout -> tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy),
timeout -> tryLock(startTime, leaseTime, unit, currentThreadId, result, waitTime, backOffPolicy),
nextSleepPeriod, TimeUnit.MILLISECONDS);
});
}

@ -21,6 +21,8 @@ import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.misc.ProxyBuilder;
import org.redisson.misc.ProxyBuilder.Callback;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
*
@ -37,7 +39,12 @@ public class ReactiveProxyBuilder {
return ProxyBuilder.create(new Callback() {
@Override
public Object execute(Callable<RFuture<Object>> callable, Method instanceMethod) {
return commandExecutor.reactive(callable);
Mono<Object> result = commandExecutor.reactive(callable);
if (instanceMethod.getReturnType().isAssignableFrom(Flux.class)) {
Mono<Iterable> monoListResult = result.cast(Iterable.class);
return monoListResult.flatMapMany(Flux::fromIterable);
}
return result;
}
}, instance, implementation, clazz, commandExecutor.getServiceManager());
}

@ -124,4 +124,21 @@ public class RedissonDequeReactiveTest extends BaseReactiveTest {
assertThat(toIterator(queue.descendingIterator())).toIterable().containsExactly(3, 2, 1);
}
@Test
public void testPollLast() {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
assertThat(toIterator(queue.pollLast(2))).toIterable().containsExactly(3, 2);
}
@Test
public void testPollFirst() {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
assertThat(toIterator(queue.pollFirst(2))).toIterable().containsExactly(1, 2);
}
}

@ -473,6 +473,27 @@ public class RedissonSpinLockTest extends BaseConcurrentTest {
}
@Test
public void testTryLockAsyncWaitTime() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
AtomicBoolean lockAsyncSucceed = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
RFuture<Boolean> booleanRFuture = lock.tryLockAsync(1, 30, TimeUnit.SECONDS);
booleanRFuture.whenComplete((res, e) -> {
if (e != null) {
Assertions.fail("Lock aquire failed for some reason");
}
lockAsyncSucceed.set(res);
});
});
thread.start();
Thread.sleep(1500);
assertThat(lockAsyncSucceed.get()).isFalse();
lock.forceUnlock();
}
@Test
public void testTryLockAsyncFailed() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");

Loading…
Cancel
Save