RedissonCountDownLatch "promised"

pull/25/head
Nikita 11 years ago
parent b053ccc525
commit 5cdf69aac4

@ -216,7 +216,6 @@ public class Redisson {
}
}
latch.subscribe();
return latch;
}

@ -15,9 +15,11 @@
*/
package org.redisson;
import java.util.concurrent.CountDownLatch;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
@ -38,14 +40,12 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
*/
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final String groupName = "redisson_countdownlatch_";
private static final Integer zeroCountMessage = 0;
private static final Integer newCountMessage = 1;
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
private final ReclosableLatch msg = new ReclosableLatch();
@ -55,43 +55,48 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
super(connectionManager, name);
}
public void subscribe() {
if (subscribeOnce.compareAndSet(false, true)) {
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
private Future<Boolean> subscribe() {
Promise<Boolean> p = promise.get();
if (p != null) {
return p;
}
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
final Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) {
return promise.get();
}
@Override
public void message(String channel, Integer message) {
if (!getChannelName().equals(channel)) {
return;
}
if (message.equals(zeroCountMessage)) {
msg.open();
}
if (message.equals(newCountMessage)) {
msg.close();
}
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
newPromise.setSuccess(true);
}
}
};
@Override
public void message(String channel, Integer message) {
if (!getChannelName().equals(channel)) {
return;
}
if (message.equals(zeroCountMessage)) {
msg.open();
}
if (message.equals(newCountMessage)) {
msg.close();
}
}
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
}
};
try {
subscribeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
return newPromise;
}
public void await() throws InterruptedException {
subscribe().await();
while (getCount() > 0) {
// waiting for open state
msg.await();
@ -101,6 +106,8 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
subscribe().await(time, unit);
time = unit.toMillis(time);
while (getCount() > 0) {
if (time <= 0) {
@ -118,6 +125,8 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public void countDown() {
subscribe().awaitUninterruptibly();
if (getCount() <= 0) {
return;
}
@ -146,6 +155,8 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public long getCount() {
subscribe().awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionReadOp();
try {
Number val = (Number) connection.get(getName());
@ -160,6 +171,8 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean trySetCount(long count) {
subscribe().awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
try {
connection.watch(getName());

@ -15,6 +15,8 @@
*/
package org.redisson;
import io.netty.util.concurrent.Promise;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RObject;
@ -36,6 +38,10 @@ abstract class RedissonObject implements RObject {
this.name = name;
}
protected <V> Promise<V> newPromise() {
return connectionManager.getGroup().next().<V>newPromise();
}
public String getName() {
return name;
}

@ -61,7 +61,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
return;
}
final Promise<Boolean> newPromise = connectionManager.getGroup().next().<Boolean>newPromise();
final Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) {
return;
}

Loading…
Cancel
Save