Fixed - methods belongs to transactional objects get blocked at high concurrency. #1459

pull/1499/head
Nikita 7 years ago
parent ba14300611
commit eb36207f93

@ -575,12 +575,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return; return;
} }
// waiting for message
final RedissonLockEntry entry = getEntry(currentThreadId); final RedissonLockEntry entry = getEntry(currentThreadId);
synchronized (entry) {
if (entry.getLatch().tryAcquire()) { if (entry.getLatch().tryAcquire()) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
} else { } else {
// waiting for message
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>(); final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() { final Runnable listener = new Runnable() {
@Override @Override
@ -598,18 +597,15 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) { if (entry.removeListener(listener)) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId); lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
} }
} }
}
}, ttl, TimeUnit.MILLISECONDS); }, ttl, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture); futureRef.set(scheduledFuture);
} }
} }
} }
}
}); });
} }
@ -768,7 +764,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message // waiting for message
final long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry(currentThreadId); final RedissonLockEntry entry = getEntry(currentThreadId);
synchronized (entry) {
if (entry.getLatch().tryAcquire()) { if (entry.getLatch().tryAcquire()) {
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
} else { } else {
@ -798,7 +793,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) { if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
@ -806,13 +800,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId); tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
} }
} }
}
}, t, TimeUnit.MILLISECONDS); }, t, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture); futureRef.set(scheduledFuture);
} }
} }
} }
}
}); });
} }

@ -213,7 +213,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message // waiting for message
final long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry(); final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) { if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
} else { } else {
@ -266,7 +265,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return; return;
} }
synchronized (entry) {
if (entry.removeListener(listener)) { if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
@ -274,12 +272,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit); tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
} }
} }
}
}, t, TimeUnit.MILLISECONDS); }, t, TimeUnit.MILLISECONDS);
waitTimeoutFutureRef.set(waitTimeoutFuture); waitTimeoutFutureRef.set(waitTimeoutFuture);
} }
} }
}
}); });
} }
@ -318,7 +314,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
final RedissonLockEntry entry = getEntry(); final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire(permits)) { if (entry.getLatch().tryAcquire(permits)) {
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit); acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
} else { } else {
@ -347,7 +342,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
entry.addListener(listener); entry.addListener(listener);
} }
} }
}
}); });
} }

@ -180,7 +180,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message // waiting for message
final long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
final RedissonLockEntry entry = getEntry(); final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire()) { if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, subscribeFuture, result); tryAcquireAsync(time, permits, subscribeFuture, result);
} else { } else {
@ -207,7 +206,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) { if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
@ -215,13 +213,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
tryAcquireAsync(time, permits, subscribeFuture, result); tryAcquireAsync(time, permits, subscribeFuture, result);
} }
} }
}
}, t, TimeUnit.MILLISECONDS); }, t, TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture); futureRef.set(scheduledFuture);
} }
} }
} }
}
}); });
} }
@ -251,7 +247,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
} }
final RedissonLockEntry entry = getEntry(); final RedissonLockEntry entry = getEntry();
synchronized (entry) {
if (entry.getLatch().tryAcquire(permits)) { if (entry.getLatch().tryAcquire(permits)) {
acquireAsync(permits, subscribeFuture, result); acquireAsync(permits, subscribeFuture, result);
} else { } else {
@ -264,7 +259,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
entry.addListener(listener); entry.addListener(listener);
} }
} }
}
}); });
} }

@ -35,12 +35,8 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override @Override
protected void onMessage(RedissonLockEntry value, Long message) { protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) { if (message.equals(unlockMessage)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll(); Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) { if (runnableToExecute != null) {
break;
}
runnableToExecute.run(); runnableToExecute.run();
} }

@ -32,12 +32,8 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override @Override
protected void onMessage(RedissonLockEntry value, Long message) { protected void onMessage(RedissonLockEntry value, Long message) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll(); Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) { if (runnableToExecute != null) {
break;
}
runnableToExecute.run(); runnableToExecute.run();
} }

@ -22,8 +22,8 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
@Test @Test
public void testFastPut() throws InterruptedException { public void testFastPut() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(16); ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i = 0; i < 500; i++) { for (int i = 0; i < 2000; i++) {
executor.submit(() -> { executor.submit(() -> {
for (int j = 0; j < 100; j++) { for (int j = 0; j < 100; j++) {
RTransaction t = redisson.createTransaction(TransactionOptions.defaults()); RTransaction t = redisson.createTransaction(TransactionOptions.defaults());
@ -35,7 +35,7 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
} }
executor.shutdown(); executor.shutdown();
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue();
} }

Loading…
Cancel
Save