Fixed exception during channel re-subscription. #87

pull/92/head
Nikita 10 years ago
parent 57dc4fcde7
commit f80a6794e7

@ -49,7 +49,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private static final Integer newCountMessage = 1;
private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = new ConcurrentHashMap<String, RedissonCountDownLatchEntry>();
private final UUID id;
RedissonCountDownLatch(ConnectionManager connectionManager, String name, UUID id) {
@ -74,12 +74,13 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
return oldPromise;
}
RedisPubSubAdapter<Integer> listener = new RedisPubSubAdapter<Integer>() {
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
if (getChannelName().equals(channel)
&& !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
@ -112,7 +113,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
Future future = connectionManager.unsubscribe(getChannelName());
future.awaitUninterruptibly();
@ -121,7 +122,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
}
private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
@ -141,7 +142,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
Future<Boolean> promise = subscribe();
try {
promise.await();
while (getCountInner() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
@ -162,7 +163,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
if (!promise.await(time, unit)) {
return false;
}
time = unit.toMillis(time);
while (getCountInner() > 0) {
if (time <= 0) {
@ -178,7 +179,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
}
return true;
} finally {
release();
@ -213,7 +214,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private String getEntryName() {
return id + getName();
}
private String getChannelName() {
return groupName + getName();
}
@ -230,7 +231,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return async.get(getName());
}
});
if (val == null) {
return 0;
}
@ -256,7 +257,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
});
}
@Override
public void delete() {
connectionManager.write(new SyncOperation<Object, Void>() {

@ -178,7 +178,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
if (getChannelName().equals(channel)
&& !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}

@ -33,7 +33,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
public void testExpire() throws InterruptedException {
RLock lock = redisson.getLock("lock");
lock.lock(2, TimeUnit.SECONDS);
final long startTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
@ -41,7 +41,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
RLock lock1 = redisson.getLock("lock");
lock1.lock();
long spendTime = System.currentTimeMillis() - startTime;
Assert.assertTrue(spendTime < 2005);
Assert.assertTrue(spendTime < 2010);
lock1.unlock();
latch.countDown();
};
@ -51,7 +51,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
lock.unlock();
}
@Test
public void testGetHoldCount() {
RLock lock = redisson.getLock("lock");

Loading…
Cancel
Save