refactoring

pull/6334/head
Nikita Koksharov 2 months ago
parent 0ac062571c
commit 4b94198b10

@ -56,6 +56,10 @@ public final class FastRemovalQueue<E> {
return false;
}
public int size() {
return index.size();
}
public E poll() {
Node<E> node = list.removeFirst();
if (node != null) {
@ -122,7 +126,7 @@ public final class FastRemovalQueue<E> {
}
public boolean remove(Node<E> node) {
return lock.execute(() -> {
Boolean r = lock.execute(() -> {
if (node.isDeleted()) {
return false;
}
@ -131,6 +135,7 @@ public final class FastRemovalQueue<E> {
node.setDeleted();
return true;
});
return Boolean.TRUE.equals(r);
}
private void removeNode(Node<E> node) {
@ -152,6 +157,10 @@ public final class FastRemovalQueue<E> {
public void moveToTail(Node<E> node) {
lock.execute(() -> {
if (node.isDeleted()) {
return;
}
removeNode(node);
node.prev = null;

@ -80,7 +80,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
}
});
lock.execute(() -> {
lock.executeInterruptibly(() -> {
try {
queue.add(holder);
ctx.writeAndFlush(data, holder.getChannelPromise());

@ -16,6 +16,7 @@
package org.redisson.misc;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
*
@ -29,6 +30,16 @@ public final class SpinLock {
private final int spinLimit = 7000;
private int nestedLevel;
private final boolean reentrant;
public SpinLock() {
this(true);
}
public SpinLock(boolean reentrant) {
this.reentrant = reentrant;
}
private void lockInterruptibly() throws InterruptedException {
int spins = 0;
while (true) {
@ -36,7 +47,8 @@ public final class SpinLock {
throw new InterruptedException();
}
if (acquired.get() == Thread.currentThread()) {
if (reentrant
&& acquired.get() == Thread.currentThread()) {
nestedLevel++;
return;
} else if (acquired.get() == null
@ -60,7 +72,35 @@ public final class SpinLock {
}
}
public void execute(Runnable r) throws InterruptedException {
public void execute(Runnable r) {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
r.run();
} finally {
unlock();
}
}
public <T> T execute(Supplier<T> r) {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
try {
return r.get();
} finally {
unlock();
}
}
public void executeInterruptibly(Runnable r) throws InterruptedException {
lockInterruptibly();
try {
r.run();

@ -11,9 +11,9 @@ public class SpinLockTest {
public void test() throws InterruptedException {
SpinLock l = new SpinLock();
CountDownLatch latch = new CountDownLatch(1);
l.execute(() -> {
l.executeInterruptibly(() -> {
try {
l.execute(() -> {
l.executeInterruptibly(() -> {
latch.countDown();
});
} catch (InterruptedException e) {

Loading…
Cancel
Save