diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index ed6cb1fc7..8f3eeef1f 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -55,7 +55,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown try { promise.await(); - while (getCountInner() > 0) { + while (getCount() > 0) { // waiting for open state RedissonCountDownLatchEntry entry = getEntry(); if (entry != null) { @@ -76,7 +76,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } time = unit.toMillis(time); - while (getCountInner() > 0) { + while (getCount() > 0) { if (time <= 0) { return false; } @@ -111,13 +111,16 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public void countDown() { - Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local v = redis.call('decr', KEYS[1]);" + + get(countDownAsync()); + } + + @Override + public Future countDownAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + - "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" + - "return 1", - Arrays.asList(getName(), getChannelName()), zeroCountMessage); - get(f); + "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", + Arrays.asList(getName(), getChannelName()), zeroCountMessage); } private String getEntryName() { @@ -130,21 +133,22 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public long getCount() { - return getCountInner(); + return get(getCountAsync()); } - private long getCountInner() { - Future f = commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET, getName()); - Long val = get(f); - if (val == null) { - return 0; - } - return val; + @Override + public Future getCountAsync() { + return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); } @Override public boolean trySetCount(long count) { - Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return get(trySetCountAsync(count)); + } + + @Override + public Future trySetCountAsync(long count) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('publish', KEYS[2], ARGV[1]); " @@ -153,7 +157,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown + "return 0 " + "end", Arrays.asList(getName(), getChannelName()), newCountMessage, count); - return get(f); } @Override diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index dfe80442f..b5b088035 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -29,6 +29,7 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor; +import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; @@ -200,6 +201,7 @@ public interface RedisCommands { RedisStrictCommand DEL_VOID = new RedisStrictCommand("DEL", new VoidReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); + RedisStrictCommand GET_LONG = new RedisStrictCommand("GET", new LongReplayConvertor()); RedisCommand GETSET = new RedisCommand("GETSET", 2); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor(), 2); RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor(), 2); diff --git a/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java index da7f45606..d50db5a34 100644 --- a/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java +++ b/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java @@ -19,6 +19,9 @@ public class LongReplayConvertor extends SingleConvertor { @Override public Long convert(Object obj) { + if (obj == null) { + return 0L; + } return Long.valueOf(obj.toString()); } diff --git a/src/main/java/org/redisson/core/RCountDownLatch.java b/src/main/java/org/redisson/core/RCountDownLatch.java index 2488e2b04..5f99af2bc 100644 --- a/src/main/java/org/redisson/core/RCountDownLatch.java +++ b/src/main/java/org/redisson/core/RCountDownLatch.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; * @author Nikita Koksharov * */ -public interface RCountDownLatch extends RObject { +public interface RCountDownLatch extends RObject, RCountDownLatchAsync { /** * Causes the current thread to wait until the latch has counted down to diff --git a/src/main/java/org/redisson/core/RCountDownLatchAsync.java b/src/main/java/org/redisson/core/RCountDownLatchAsync.java new file mode 100644 index 000000000..71e9600d0 --- /dev/null +++ b/src/main/java/org/redisson/core/RCountDownLatchAsync.java @@ -0,0 +1,63 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import io.netty.util.concurrent.Future; + +/** + * Distributed alternative to the {@link java.util.concurrent.CountDownLatch} + * + * It has an advantage over {@link java.util.concurrent.CountDownLatch} -- + * count can be set via {@link #trySetCount} method. + * + * @author Nikita Koksharov + * + */ +public interface RCountDownLatchAsync extends RObjectAsync { + + /** + * Decrements the count of the latch, releasing all waiting threads if + * the count reaches zero. + * + *

If the current count is greater than zero then it is decremented. + * If the new count is zero then all waiting threads are re-enabled for + * thread scheduling purposes. + * + *

If the current count equals zero then nothing happens. + */ + Future countDownAsync(); + + /** + * Returns the current count. + * + *

This method is typically used for debugging and testing purposes. + * + * @return the current count + */ + Future getCountAsync(); + + /** + * Sets new count value only if previous count already has reached zero + * or is not set at all. + * + * @param count - number of times {@link #countDown} must be invoked + * before threads can pass through {@link #await} + * @return true if new count setted + * false if previous count has not reached zero + */ + Future trySetCountAsync(long count); + +} diff --git a/src/test/java/org/redisson/RedissonCountDownLatchTest.java b/src/test/java/org/redisson/RedissonCountDownLatchTest.java index 6e2e76789..39caccbca 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchTest.java @@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.redisson.core.RCountDownLatch; +import static org.assertj.core.api.Assertions.*; public class RedissonCountDownLatchTest extends BaseTest { @@ -143,7 +144,13 @@ public class RedissonCountDownLatchTest extends BaseTest { @Test public void testTrySetCount() throws Exception { RCountDownLatch latch = redisson.getCountDownLatch("latch"); - Assert.assertTrue(latch.trySetCount(1)); - Assert.assertFalse(latch.trySetCount(2)); + assertThat(latch.trySetCount(1)).isTrue(); + assertThat(latch.trySetCount(2)).isFalse(); + } + + @Test + public void testCount() { + RCountDownLatch latch = redisson.getCountDownLatch("latch"); + assertThat(latch.getCount()).isEqualTo(0); } }