CountDownLatch improvements

pull/6/head
Nikita 11 years ago
parent 5e91d961a6
commit f44997a8ee

@ -16,12 +16,11 @@
package org.redisson;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.core.RCountDownLatch;
import org.redisson.misc.internal.ThreadLocalSemaphore;
import org.redisson.misc.internal.ReclosableLatch;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
@ -41,7 +40,7 @@ public class RedissonCountDownLatch implements RCountDownLatch {
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final ThreadLocalSemaphore msg = new ThreadLocalSemaphore();
private final ReclosableLatch msg = new ReclosableLatch();
RedissonCountDownLatch(Redisson redisson, RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, String name) {
this.connection = connection;
@ -63,9 +62,7 @@ public class RedissonCountDownLatch implements RCountDownLatch {
@Override
public void message(Object channel, Object message) {
if (message.equals(unlockMessage)) {
for (Semaphore s : msg.getAll()) {
s.release();
}
msg.open();
}
}
@ -84,9 +81,8 @@ public class RedissonCountDownLatch implements RCountDownLatch {
public void await() throws InterruptedException {
while (getCount() > 0) {
// waiting for message
msg.get().acquire();
msg.await();
}
msg.remove();
}
@ -95,17 +91,15 @@ public class RedissonCountDownLatch implements RCountDownLatch {
time = unit.toMillis(time);
while (getCount() > 0) {
if (time <= 0) {
msg.remove();
return false;
}
long current = System.currentTimeMillis();
// waiting for message
msg.get().tryAcquire(time, TimeUnit.MILLISECONDS);
msg.await(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
}
msg.remove();
return true;
}
@ -139,7 +133,11 @@ public class RedissonCountDownLatch implements RCountDownLatch {
@Override
public boolean trySetCount(long count) {
return connection.setnx(name, count);
Boolean res = connection.setnx(name, count);
if (res) {
msg.close();
}
return res;
}
@Override

@ -103,6 +103,7 @@ public class RedissonLock implements RLock {
private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection;
// TODO move it Redisson as ID
private final UUID id = UUID.randomUUID();
private final String groupName = "redisson_lock";
private final String name;

@ -0,0 +1,93 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2009 Red Hat Inc. and/or its affiliates and other
* contributors as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.redisson.misc.internal;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* A thread gate, that uses an {@link java.util.concurrent.locks.AbstractQueuedSynchronizer}.
* <p/>
* This implementation allows you to create a latch with a default state (open or closed), and repeatedly open or close
* the latch.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @since 4.0
*/
public class ReclosableLatch extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1744280161777661090l;
// the following states are used in the AQS.
private static final int OPEN_STATE = 0, CLOSED_STATE = 1;
public ReclosableLatch() {
setState(CLOSED_STATE);
}
public ReclosableLatch(boolean defaultOpen) {
setState(defaultOpen ? OPEN_STATE : CLOSED_STATE);
}
@Override
public final int tryAcquireShared(int ignored) {
// return 1 if we allow the requestor to proceed, -1 if we want the requestor to block.
return getState() == OPEN_STATE ? 1 : -1;
}
@Override
public final boolean tryReleaseShared(int state) {
// used as a mechanism to set the state of the Sync.
setState(state);
return true;
}
public final void open() {
// do not use setState() directly since this won't notify parked threads.
releaseShared(OPEN_STATE);
}
public final void close() {
// do not use setState() directly since this won't notify parked threads.
releaseShared(CLOSED_STATE);
}
public boolean isOpened() {
return getState() == OPEN_STATE;
}
public final void await() throws InterruptedException {
acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used.
}
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy value that is not used.
}
@Override
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return "ReclosableLatch [State = " + s + ", " + q + "empty queue]";
}
}

@ -3,6 +3,7 @@ package org.redisson;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
@ -12,37 +13,36 @@ public class RedissonCountDownLatchConcurrentTest {
@Test
public void testSingleCountDownAwait_SingleInstance() throws InterruptedException {
int iterations = Runtime.getRuntime().availableProcessors()*2;
final int iterations = Runtime.getRuntime().availableProcessors()*3;
Redisson redisson = Redisson.create();
final RCountDownLatch latch = redisson.getCountDownLatch("latch");
latch.trySetCount(iterations);
ExecutorService executor = Executors.newFixedThreadPool(iterations);
final AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newScheduledThreadPool(iterations);
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
latch.await();
Assert.assertEquals(0, latch.getCount());
Assert.assertEquals(iterations, counter.get());
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
}
});
}
executor = Executors.newScheduledThreadPool(iterations);
ExecutorService countDownExecutor = Executors.newFixedThreadPool(iterations);
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
countDownExecutor.execute(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
Assert.fail();
}
latch.countDown();
counter.incrementAndGet();
}
});
}

Loading…
Cancel
Save