Merge branch 'mrniko/master'

pull/282/head
jackygurui 10 years ago
commit 8b587cf16b

@ -54,24 +54,32 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
this.id = id;
}
private Future<Boolean> subscribe() {
Promise<Boolean> promise = aquire();
if (promise != null) {
return promise;
}
private Future<RedissonCountDownLatchEntry> subscribe() {
synchronized (ENTRIES) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
Promise<RedissonCountDownLatchEntry> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
Promise<Boolean> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
return oldPromise;
RedisPubSubListener<Integer> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
return newPromise;
}
}
private RedisPubSubListener<Integer> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
@Override
@ -89,61 +97,32 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().setSuccess(value);
return true;
}
return false;
}
};
synchronized (ENTRIES) {
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
}
return newPromise;
}
private void unsubscribe() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
synchronized (ENTRIES) {
// maybe added during subscription
if (!ENTRIES.containsKey(getEntryName())) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
return;
}
}
return listener;
}
private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
private void unsubscribe(RedissonCountDownLatchEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
} else {
return null;
}
}
}
public void await() throws InterruptedException {
Future<Boolean> promise = subscribe();
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
promise.await();
@ -155,14 +134,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
} finally {
unsubscribe();
unsubscribe(promise.getNow());
}
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<Boolean> promise = subscribe();
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
if (!promise.await(time, unit)) {
return false;
@ -186,7 +165,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return true;
} finally {
unsubscribe();
unsubscribe(promise.getNow());
}
}

@ -15,69 +15,37 @@
*/
package org.redisson;
import io.netty.util.concurrent.Promise;
import org.redisson.misc.ReclosableLatch;
import io.netty.util.concurrent.Promise;
public class RedissonCountDownLatchEntry {
private int counter;
private final ReclosableLatch latch;
private final Promise<Boolean> promise;
public RedissonCountDownLatchEntry(RedissonCountDownLatchEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}
public RedissonCountDownLatchEntry(Promise<Boolean> promise) {
private final Promise<RedissonCountDownLatchEntry> promise;
public RedissonCountDownLatchEntry(Promise<RedissonCountDownLatchEntry> promise) {
super();
this.latch = new ReclosableLatch();
this.promise = promise;
}
public boolean isFree() {
return counter == 0;
}
public void aquire() {
counter++;
}
public void release() {
counter--;
public int release() {
return --counter;
}
public Promise<Boolean> getPromise() {
public Promise<RedissonCountDownLatchEntry> getPromise() {
return promise;
}
public ReclosableLatch getLatch() {
return latch;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedissonCountDownLatchEntry other = (RedissonCountDownLatchEntry) obj;
if (counter != other.counter)
return false;
return true;
}
}

@ -58,26 +58,14 @@ public class RedissonLock extends RedissonExpirable implements RLock {
this.id = id;
}
private void unsubscribe() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
synchronized (ENTRIES) {
// maybe added during subscription
if (!ENTRIES.containsKey(getEntryName())) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
private void unsubscribe(RedissonLockEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
return;
}
}
}
@ -86,63 +74,49 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return id + ":" + getName();
}
private Promise<Boolean> aquire() {
while (true) {
private Future<RedissonLockEntry> subscribe() {
synchronized (ENTRIES) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return null;
}
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
}
}
private Future<Boolean> subscribe() {
Promise<Boolean> promise = aquire();
if (promise != null) {
return promise;
}
Promise<RedissonLockEntry> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
Promise<Boolean> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
return oldPromise;
}
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
@Override
public void onMessage(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
@Override
public void onMessage(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
}
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
return true;
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName())
&& !value.getPromise().isSuccess()
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().setSuccess(value);
return true;
}
return false;
}
return false;
}
};
};
synchronized (ENTRIES) {
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
return newPromise;
}
return newPromise;
}
private String getChannelName() {
@ -186,7 +160,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return;
}
subscribe().awaitUninterruptibly();
Future<RedissonLockEntry> future = subscribe();
future.awaitUninterruptibly();
try {
while (true) {
@ -209,7 +184,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
}
} finally {
unsubscribe();
unsubscribe(future.getNow());
}
}
@ -290,7 +265,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return true;
}
if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
Future<RedissonLockEntry> future = subscribe();
if (!future.awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
return false;
}
@ -325,7 +301,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
return true;
} finally {
unsubscribe();
unsubscribe(future.getNow());
}
}

@ -24,36 +24,32 @@ public class RedissonLockEntry {
private int counter;
private final Semaphore latch;
private final Promise<Boolean> promise;
private final Promise<RedissonLockEntry> promise;
public RedissonLockEntry(RedissonLockEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}
public RedissonLockEntry(Promise<Boolean> promise) {
public RedissonLockEntry(Promise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);
this.promise = promise;
}
public boolean isFree() {
return counter == 0;
}
public void aquire() {
counter++;
}
public void release() {
counter--;
public int release() {
return --counter;
}
public Promise<Boolean> getPromise() {
public Promise<RedissonLockEntry> getPromise() {
return promise;
}
public Semaphore getLatch() {
return latch;
}
@ -79,5 +75,5 @@ public class RedissonLockEntry {
return false;
return true;
}
}

Loading…
Cancel
Save