Feature - RedissonMultiLock implements isHeldByThread() and isHeldByCurrentThread() methods. #3243

pull/5676/head
Nikita Koksharov 11 months ago
parent eb51f479d7
commit 0f05a859f8

@ -22,8 +22,13 @@ import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.stream.Stream;
/** /**
* Groups multiple independent locks and manages them as one lock. * Groups multiple independent locks and manages them as one lock.
@ -281,12 +286,9 @@ public class RedissonMultiLock implements RLock {
} }
protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) { protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {
List<CompletableFuture<Void>> futures = new ArrayList<>(locks.size()); CompletableFuture[] s = locks.stream().map(l -> l.unlockAsync(threadId).toCompletableFuture())
for (RLock lock : locks) { .toArray(CompletableFuture[]::new);
RFuture<Void> f = lock.unlockAsync(threadId); CompletableFuture<Void> future = CompletableFuture.allOf(s);
futures.add(f.toCompletableFuture());
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(future); return new CompletableFutureWrapper<>(future);
} }
@ -408,15 +410,7 @@ public class RedissonMultiLock implements RLock {
@Override @Override
public void unlock() { public void unlock() {
List<RFuture<Void>> futures = new ArrayList<>(locks.size()); locks.forEach(Lock::unlock);
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.toCompletableFuture().join();
}
} }
@Override @Override
@ -486,17 +480,23 @@ public class RedissonMultiLock implements RLock {
@Override @Override
public boolean isHeldByThread(long threadId) { public boolean isHeldByThread(long threadId) {
throw new UnsupportedOperationException(); return locks.stream().map(l -> l.isHeldByThread(threadId)).reduce(true, (r, u) -> r && u);
} }
@Override @Override
public RFuture<Boolean> isHeldByThreadAsync(long threadId) { public RFuture<Boolean> isHeldByThreadAsync(long threadId) {
throw new UnsupportedOperationException(); CompletableFuture<Boolean>[] s = locks.stream().map(l -> l.isHeldByThreadAsync(threadId).toCompletableFuture())
.toArray(CompletableFuture[]::new);
CompletableFuture<Void> future = CompletableFuture.allOf(s);
CompletableFuture<Boolean> f = future.thenApply(v -> Stream.of(s).map(r2 -> r2.getNow(false))
.reduce(true, (r, u) -> r && u));
return new CompletableFutureWrapper<>(f);
} }
@Override @Override
public boolean isHeldByCurrentThread() { public boolean isHeldByCurrentThread() {
throw new UnsupportedOperationException(); return locks.stream().map(l -> l.isHeldByCurrentThread())
.reduce(true, (r, u) -> r && u);
} }
@Override @Override

@ -27,12 +27,16 @@ public class RedissonMultiLockTest extends RedisDockerTest {
RLock lock = redisson.getMultiLock(lock1, lock2, lock3); RLock lock = redisson.getMultiLock(lock1, lock2, lock3);
try { try {
lock.lock(10, TimeUnit.SECONDS); lock.lock(10, TimeUnit.SECONDS);
assertThat(lock.isHeldByCurrentThread()).isTrue();
assertThat(lock.isHeldByThread(Thread.currentThread().threadId())).isTrue();
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally { } finally {
lock.unlock(); lock.unlock();
} }
assertThat(lock.isHeldByCurrentThread()).isFalse();
assertThat(lock.isHeldByThread(Thread.currentThread().threadId())).isFalse();
} }
@Test @Test

Loading…
Cancel
Save