Fixed - methods belongs to transactional objects get blocked at high concurrency. #1459

pull/1499/head
Nikita 7 years ago
parent c3cdb4f5a8
commit 8d347ad22b

@ -35,27 +35,16 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
value.getLatch().release();
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
runnableToExecute.run();
}
value.getLatch().release();
}
}

@ -32,27 +32,16 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
value.getLatch().release(message.intValue());
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
runnableToExecute.run();
}
value.getLatch().release(message.intValue());
}
}

@ -1,9 +1,12 @@
package org.redisson.transaction;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.BaseTest;
@ -17,6 +20,25 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
protected abstract RMap<String, String> getTransactionalMap(RTransaction transaction);
@Test
public void testFastPut() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(16);
for (int i = 0; i < 500; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
RTransaction t = redisson.createTransaction(TransactionOptions.defaults());
RMap<String, String> map = getTransactionalMap(t);
map.fastPut("1", "1");
t.commit();
}
});
}
executor.shutdown();
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
@Test
public void testPutAll() {
RMap<String, String> m = getMap();

Loading…
Cancel
Save