From d05f8c154d5b83c000a3b1afd547084262161dcf Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Sep 2015 17:54:28 +0300 Subject: [PATCH] RedissonCountDownLatch & RedissonLock internal subscribe/unsubscribe sync simplified --- .../org/redisson/RedissonCountDownLatch.java | 95 ++++++--------- .../redisson/RedissonCountDownLatchEntry.java | 56 ++------- src/main/java/org/redisson/RedissonLock.java | 108 +++++++----------- .../java/org/redisson/RedissonLockEntry.java | 28 ++--- 4 files changed, 103 insertions(+), 184 deletions(-) diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 71770af8e..6223555b7 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -54,24 +54,32 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown this.id = id; } - private Future subscribe() { - Promise promise = aquire(); - if (promise != null) { - return promise; - } + private Future subscribe() { + synchronized (ENTRIES) { + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); + if (entry != null) { + entry.aquire(); + return entry.getPromise(); + } + + Promise newPromise = newPromise(); + final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise); + value.aquire(); - Promise newPromise = newPromise(); - final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise); - value.aquire(); - RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); - if (oldValue != null) { - Promise oldPromise = aquire(); - if (oldPromise == null) { - return subscribe(); + RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); + if (oldValue != null) { + oldValue.aquire(); + return oldValue.getPromise(); } - return oldPromise; + + RedisPubSubListener listener = createListener(value); + + commandExecutor.getConnectionManager().subscribe(listener, getChannelName()); + return newPromise; } + } + private RedisPubSubListener createListener(final RedissonCountDownLatchEntry value) { RedisPubSubListener listener = new BaseRedisPubSubListener() { @Override @@ -89,61 +97,32 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public boolean onStatus(PubSubType type, String channel) { - if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) { - value.getPromise().setSuccess(true); + if (channel.equals(getChannelName()) && !value.getPromise().isSuccess() + && type == PubSubType.SUBSCRIBE) { + value.getPromise().setSuccess(value); return true; } return false; } }; - - synchronized (ENTRIES) { - commandExecutor.getConnectionManager().subscribe(listener, getChannelName()); - } - return newPromise; - } - - private void unsubscribe() { - while (true) { - RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); - if (entry == null) { - return; - } - RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry); - newEntry.release(); - if (ENTRIES.replace(getEntryName(), entry, newEntry)) { - if (newEntry.isFree() - && ENTRIES.remove(getEntryName(), newEntry)) { - synchronized (ENTRIES) { - // maybe added during subscription - if (!ENTRIES.containsKey(getEntryName())) { - commandExecutor.getConnectionManager().unsubscribe(getChannelName()); - } - } - } - return; - } - } + return listener; } - private Promise aquire() { - while (true) { - RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); - if (entry != null) { - RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry); - newEntry.aquire(); - if (ENTRIES.replace(getEntryName(), entry, newEntry)) { - return newEntry.getPromise(); + private void unsubscribe(RedissonCountDownLatchEntry entry) { + synchronized (ENTRIES) { + if (entry.release() == 0) { + // just an assertion + boolean removed = ENTRIES.remove(getEntryName()) == entry; + if (removed) { + commandExecutor.getConnectionManager().unsubscribe(getChannelName()); } - } else { - return null; } } } public void await() throws InterruptedException { - Future promise = subscribe(); + Future promise = subscribe(); try { promise.await(); @@ -155,14 +134,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } } } finally { - unsubscribe(); + unsubscribe(promise.getNow()); } } @Override public boolean await(long time, TimeUnit unit) throws InterruptedException { - Future promise = subscribe(); + Future promise = subscribe(); try { if (!promise.await(time, unit)) { return false; @@ -186,7 +165,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return true; } finally { - unsubscribe(); + unsubscribe(promise.getNow()); } } diff --git a/src/main/java/org/redisson/RedissonCountDownLatchEntry.java b/src/main/java/org/redisson/RedissonCountDownLatchEntry.java index 956801143..181d173f1 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatchEntry.java +++ b/src/main/java/org/redisson/RedissonCountDownLatchEntry.java @@ -15,69 +15,37 @@ */ package org.redisson; -import io.netty.util.concurrent.Promise; - import org.redisson.misc.ReclosableLatch; +import io.netty.util.concurrent.Promise; + public class RedissonCountDownLatchEntry { private int counter; private final ReclosableLatch latch; - private final Promise promise; - - public RedissonCountDownLatchEntry(RedissonCountDownLatchEntry source) { - counter = source.counter; - latch = source.latch; - promise = source.promise; - } - - public RedissonCountDownLatchEntry(Promise promise) { + private final Promise promise; + + public RedissonCountDownLatchEntry(Promise promise) { super(); this.latch = new ReclosableLatch(); this.promise = promise; } - - public boolean isFree() { - return counter == 0; - } - + public void aquire() { counter++; } - - public void release() { - counter--; + + public int release() { + return --counter; } - - public Promise getPromise() { + + public Promise getPromise() { return promise; } - + public ReclosableLatch getLatch() { return latch; } - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + counter; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - RedissonCountDownLatchEntry other = (RedissonCountDownLatchEntry) obj; - if (counter != other.counter) - return false; - return true; - } - } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 2ec614f71..e18e20fb4 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -58,26 +58,14 @@ public class RedissonLock extends RedissonExpirable implements RLock { this.id = id; } - private void unsubscribe() { - while (true) { - RedissonLockEntry entry = ENTRIES.get(getEntryName()); - if (entry == null) { - return; - } - - RedissonLockEntry newEntry = new RedissonLockEntry(entry); - newEntry.release(); - if (ENTRIES.replace(getEntryName(), entry, newEntry)) { - if (newEntry.isFree() - && ENTRIES.remove(getEntryName(), newEntry)) { - synchronized (ENTRIES) { - // maybe added during subscription - if (!ENTRIES.containsKey(getEntryName())) { - commandExecutor.getConnectionManager().unsubscribe(getChannelName()); - } - } + private void unsubscribe(RedissonLockEntry entry) { + synchronized (ENTRIES) { + if (entry.release() == 0) { + // just an assertion + boolean removed = ENTRIES.remove(getEntryName()) == entry; + if (removed) { + commandExecutor.getConnectionManager().unsubscribe(getChannelName()); } - return; } } } @@ -86,63 +74,49 @@ public class RedissonLock extends RedissonExpirable implements RLock { return id + ":" + getName(); } - private Promise aquire() { - while (true) { + private Future subscribe() { + synchronized (ENTRIES) { RedissonLockEntry entry = ENTRIES.get(getEntryName()); - if (entry == null) { - return null; - } - - RedissonLockEntry newEntry = new RedissonLockEntry(entry); - newEntry.aquire(); - if (ENTRIES.replace(getEntryName(), entry, newEntry)) { - return newEntry.getPromise(); + if (entry != null) { + entry.aquire(); + return entry.getPromise(); } - } - } - private Future subscribe() { - Promise promise = aquire(); - if (promise != null) { - return promise; - } + Promise newPromise = newPromise(); + final RedissonLockEntry value = new RedissonLockEntry(newPromise); + value.aquire(); - Promise newPromise = newPromise(); - final RedissonLockEntry value = new RedissonLockEntry(newPromise); - value.aquire(); - RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); - if (oldValue != null) { - Promise oldPromise = aquire(); - if (oldPromise == null) { - return subscribe(); + RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); + if (oldValue != null) { + oldValue.aquire(); + return oldValue.getPromise(); } - return oldPromise; - } - RedisPubSubListener listener = new BaseRedisPubSubListener() { + RedisPubSubListener listener = new BaseRedisPubSubListener() { - @Override - public void onMessage(String channel, Integer message) { - if (message.equals(unlockMessage) && getChannelName().equals(channel)) { - value.getLatch().release(); + @Override + public void onMessage(String channel, Integer message) { + if (message.equals(unlockMessage) && getChannelName().equals(channel)) { + value.getLatch().release(); + } } - } - @Override - public boolean onStatus(PubSubType type, String channel) { - if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) { - value.getPromise().setSuccess(true); - return true; + @Override + public boolean onStatus(PubSubType type, String channel) { + if (channel.equals(getChannelName()) + && !value.getPromise().isSuccess() + && type == PubSubType.SUBSCRIBE) { + value.getPromise().setSuccess(value); + return true; + } + return false; } - return false; - } - }; + }; - synchronized (ENTRIES) { commandExecutor.getConnectionManager().subscribe(listener, getChannelName()); + return newPromise; } - return newPromise; } private String getChannelName() { @@ -186,7 +160,8 @@ public class RedissonLock extends RedissonExpirable implements RLock { return; } - subscribe().awaitUninterruptibly(); + Future future = subscribe(); + future.awaitUninterruptibly(); try { while (true) { @@ -209,7 +184,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } } } finally { - unsubscribe(); + unsubscribe(future.getNow()); } } @@ -290,7 +265,8 @@ public class RedissonLock extends RedissonExpirable implements RLock { return true; } - if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) { + Future future = subscribe(); + if (!future.awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) { return false; } @@ -325,7 +301,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } return true; } finally { - unsubscribe(); + unsubscribe(future.getNow()); } } diff --git a/src/main/java/org/redisson/RedissonLockEntry.java b/src/main/java/org/redisson/RedissonLockEntry.java index ca274e7fe..ca56333ab 100644 --- a/src/main/java/org/redisson/RedissonLockEntry.java +++ b/src/main/java/org/redisson/RedissonLockEntry.java @@ -24,36 +24,32 @@ public class RedissonLockEntry { private int counter; private final Semaphore latch; - private final Promise promise; - + private final Promise promise; + public RedissonLockEntry(RedissonLockEntry source) { counter = source.counter; latch = source.latch; promise = source.promise; } - - public RedissonLockEntry(Promise promise) { + + public RedissonLockEntry(Promise promise) { super(); this.latch = new Semaphore(0); this.promise = promise; } - - public boolean isFree() { - return counter == 0; - } - + public void aquire() { counter++; } - - public void release() { - counter--; + + public int release() { + return --counter; } - - public Promise getPromise() { + + public Promise getPromise() { return promise; } - + public Semaphore getLatch() { return latch; } @@ -79,5 +75,5 @@ public class RedissonLockEntry { return false; return true; } - + }