From f7e994975e7261d66476e36e1147101261b97992 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 4 Aug 2016 22:53:41 +0300 Subject: [PATCH] Cancel unsuccessful subscription and remove AsyncSemaphore listener. #543 --- src/main/java/org/redisson/RedissonLock.java | 17 ++++---- .../org/redisson/pubsub/AsyncSemaphore.java | 7 +++- .../org/redisson/pubsub/PublishSubscribe.java | 18 ++++++-- .../org/redisson/RedissonLockHeavyTest.java | 42 +++++++++++++++++++ 4 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 41241553d..0e5ad419b 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -266,14 +266,16 @@ public class RedissonLock extends RedissonExpirable implements RLock { final long threadId = Thread.currentThread().getId(); Future future = subscribe(threadId); if (!await(future, time, TimeUnit.MILLISECONDS)) { - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - unsubscribe(future, threadId); + if (!future.cancel(false)) { + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + unsubscribe(future, threadId); + } } - } - }); + }); + } return false; } @@ -639,6 +641,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void run() { if (!subscribeFuture.isDone()) { + subscribeFuture.cancel(false); result.trySuccess(false); } } diff --git a/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index e6b05a252..49f3d29b2 100644 --- a/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -65,7 +65,12 @@ public class AsyncSemaphore { if (run) { listener.run(); - return; + } + } + + public boolean remove(Runnable listener) { + synchronized (this) { + return listeners.remove(listener); } } diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 793deea88..eefab4ab0 100644 --- a/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -16,6 +16,7 @@ package org.redisson.pubsub; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.PubSubEntry; import org.redisson.client.BaseRedisPubSubListener; @@ -23,6 +24,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.connection.ConnectionManager; +import org.redisson.misc.PromiseDelegator; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -57,10 +59,16 @@ abstract class PublishSubscribe> { } public Future subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { - final Promise newPromise = connectionManager.newPromise(); - + final AtomicReference listenerHolder = new AtomicReference(); final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); - semaphore.acquire(new Runnable() { + final Promise newPromise = new PromiseDelegator(connectionManager.newPromise()) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return semaphore.remove(listenerHolder.get()); + } + }; + + Runnable listener = new Runnable() { @Override public void run() { @@ -86,7 +94,9 @@ abstract class PublishSubscribe> { RedisPubSubListener listener = createListener(channelName, value); connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); } - }); + }; + semaphore.acquire(listener); + listenerHolder.set(listener); return newPromise; } diff --git a/src/test/java/org/redisson/RedissonLockHeavyTest.java b/src/test/java/org/redisson/RedissonLockHeavyTest.java index d896e221b..3e25bff47 100644 --- a/src/test/java/org/redisson/RedissonLockHeavyTest.java +++ b/src/test/java/org/redisson/RedissonLockHeavyTest.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.junit.FixMethodOrder; @@ -68,5 +69,46 @@ public class RedissonLockHeavyTest extends BaseTest { executor.awaitTermination(threads * loops, TimeUnit.SECONDS); } + + @Test + public void tryLockUnlockRLock() throws Exception { + for (int i = 0; i < threads; i++) { + + Runnable worker = new Runnable() { + + @Override + public void run() { + for (int j = 0; j < loops; j++) { + RLock lock = redisson.getLock("RLOCK_" + j); + try { + if (lock.tryLock(ThreadLocalRandom.current().nextInt(10), TimeUnit.MILLISECONDS)) { + try { + RBucket bucket = redisson.getBucket("RBUCKET_" + j); + bucket.set("TEST", 30, TimeUnit.SECONDS); + RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j); + semaphore.release(); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + lock.unlock(); + } + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + executor.execute(worker); + } + executor.shutdown(); + executor.awaitTermination(threads * loops, TimeUnit.SECONDS); + + } + } \ No newline at end of file