RedissonCountDownLatchAsync implemented #306

pull/382/head
Nikita 9 years ago
parent b6e7cdd062
commit ea0db454f3

@ -55,7 +55,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
try {
promise.await();
while (getCountInner() > 0) {
while (getCount() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
@ -76,7 +76,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
time = unit.toMillis(time);
while (getCountInner() > 0) {
while (getCount() > 0) {
if (time <= 0) {
return false;
}
@ -111,13 +111,16 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public void countDown() {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
get(countDownAsync());
}
@Override
public Future<Void> countDownAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" +
"return 1",
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
get(f);
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
}
private String getEntryName() {
@ -130,21 +133,22 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public long getCount() {
return getCountInner();
return get(getCountAsync());
}
private long getCountInner() {
Future<Long> f = commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET, getName());
Long val = get(f);
if (val == null) {
return 0;
}
return val;
@Override
public Future<Long> getCountAsync() {
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
}
@Override
public boolean trySetCount(long count) {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return get(trySetCountAsync(count));
}
@Override
public Future<Boolean> trySetCountAsync(long count) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
@ -153,7 +157,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
return get(f);
}
@Override

@ -29,6 +29,7 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
@ -200,6 +201,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> DEL_VOID = new RedisStrictCommand<Void>("DEL", new VoidReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET", 2);
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor(), 2);
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor(), 2);

@ -19,6 +19,9 @@ public class LongReplayConvertor extends SingleConvertor<Long> {
@Override
public Long convert(Object obj) {
if (obj == null) {
return 0L;
}
return Long.valueOf(obj.toString());
}

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov
*
*/
public interface RCountDownLatch extends RObject {
public interface RCountDownLatch extends RObject, RCountDownLatchAsync {
/**
* Causes the current thread to wait until the latch has counted down to

@ -0,0 +1,63 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
/**
* Distributed alternative to the {@link java.util.concurrent.CountDownLatch}
*
* It has an advantage over {@link java.util.concurrent.CountDownLatch} --
* count can be set via {@link #trySetCount} method.
*
* @author Nikita Koksharov
*
*/
public interface RCountDownLatchAsync extends RObjectAsync {
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
Future<Void> countDownAsync();
/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
Future<Long> getCountAsync();
/**
* Sets new count value only if previous count already has reached zero
* or is not set at all.
*
* @param count - number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @return <code>true</code> if new count setted
* <code>false</code> if previous count has not reached zero
*/
Future<Boolean> trySetCountAsync(long count);
}

@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RCountDownLatch;
import static org.assertj.core.api.Assertions.*;
public class RedissonCountDownLatchTest extends BaseTest {
@ -143,7 +144,13 @@ public class RedissonCountDownLatchTest extends BaseTest {
@Test
public void testTrySetCount() throws Exception {
RCountDownLatch latch = redisson.getCountDownLatch("latch");
Assert.assertTrue(latch.trySetCount(1));
Assert.assertFalse(latch.trySetCount(2));
assertThat(latch.trySetCount(1)).isTrue();
assertThat(latch.trySetCount(2)).isFalse();
}
@Test
public void testCount() {
RCountDownLatch latch = redisson.getCountDownLatch("latch");
assertThat(latch.getCount()).isEqualTo(0);
}
}

Loading…
Cancel
Save