diff --git a/src/main/java/org/redisson/RedissonAtomicLongReactive.java b/src/main/java/org/redisson/RedissonAtomicLongReactive.java new file mode 100644 index 000000000..e157dd603 --- /dev/null +++ b/src/main/java/org/redisson/RedissonAtomicLongReactive.java @@ -0,0 +1,109 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Collections; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.core.RAtomicLongReactive; + +import reactor.rx.Streams; + +/** + * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} + * + * @author Nikita Koksharov + * + */ +public class RedissonAtomicLongReactive extends RedissonExpirableReactive implements RAtomicLongReactive { + + protected RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + @Override + public Publisher addAndGet(long delta) { + return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta); + } + + @Override + public Publisher compareAndSet(long expect, long update) { + return commandExecutor.evalWriteObservable(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "redis.call('set', KEYS[1], ARGV[2]); " + + "return true " + + "else " + + "return false end", + Collections.singletonList(getName()), expect, update); + } + + @Override + public Publisher decrementAndGet() { + return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName()); + } + + @Override + public Publisher get() { + return addAndGet(0); + } + + @Override + public Publisher getAndAdd(long delta) { + return commandExecutor.evalWriteObservable(getName(), + StringCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local v = redis.call('get', KEYS[1]) or 0; " + + "redis.call('set', KEYS[1], v + ARGV[1]); " + + "return tonumber(v)", + Collections.singletonList(getName()), delta); + } + + + @Override + public Publisher getAndSet(long newValue) { + return commandExecutor.evalWriteObservable(getName(), + StringCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)", + Collections.singletonList(getName()), newValue); + } + + @Override + public Publisher incrementAndGet() { + return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName()); + } + + @Override + public Publisher getAndIncrement() { + return getAndAdd(1); + } + + @Override + public Publisher getAndDecrement() { + return getAndAdd(-1); + } + + @Override + public Publisher set(long newValue) { + return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue); + } + + public String toString() { + return Long.toString(Streams.create(get()).next().poll()); + } + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index a959500e7..e0cbda282 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -29,6 +29,7 @@ import org.redisson.connection.ElasticacheConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; +import org.redisson.core.RAtomicLongReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; @@ -214,6 +215,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonDequeReactive(codec, commandExecutor, name); } + @Override + public RAtomicLongReactive getAtomicLong(String name) { + return new RedissonAtomicLongReactive(commandExecutor, name); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 7d5474970..fef72feb4 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.List; import org.redisson.client.codec.Codec; +import org.redisson.core.RAtomicLongReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; @@ -165,14 +166,14 @@ public interface RedissonReactiveClient { RDequeReactive getDeque(String name, Codec codec); -// /** -// * Returns "atomic long" instance by name. -// * -// * @param name of the "atomic long" -// * @return -// */ -// RAtomicLong getAtomicLong(String name); -// + /** + * Returns "atomic long" instance by name. + * + * @param name of the "atomic long" + * @return + */ + RAtomicLongReactive getAtomicLong(String name); + // /** // * Returns "count down latch" instance by name. // * diff --git a/src/main/java/org/redisson/core/RAtomicLongReactive.java b/src/main/java/org/redisson/core/RAtomicLongReactive.java new file mode 100644 index 000000000..4e290adcf --- /dev/null +++ b/src/main/java/org/redisson/core/RAtomicLongReactive.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import org.reactivestreams.Publisher; + +public interface RAtomicLongReactive extends RExpirableReactive { + + Publisher compareAndSet(long expect, long update); + + Publisher addAndGet(long delta); + + Publisher decrementAndGet(); + + Publisher get(); + + Publisher getAndAdd(long delta); + + Publisher getAndSet(long newValue); + + Publisher incrementAndGet(); + + Publisher getAndIncrement(); + + Publisher getAndDecrement(); + + Publisher set(long newValue); + +} diff --git a/src/test/java/org/redisson/RedissonAtomicLongReactiveTest.java b/src/test/java/org/redisson/RedissonAtomicLongReactiveTest.java new file mode 100644 index 000000000..f1cf3845a --- /dev/null +++ b/src/test/java/org/redisson/RedissonAtomicLongReactiveTest.java @@ -0,0 +1,61 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RAtomicLongReactive; + +public class RedissonAtomicLongReactiveTest extends BaseReactiveTest { + + @Test + public void testCompareAndSet() { + RAtomicLongReactive al = redisson.getAtomicLong("test"); + Assert.assertFalse(sync(al.compareAndSet(-1, 2))); + Assert.assertEquals(0, sync(al.get()).intValue()); + Assert.assertTrue(sync(al.compareAndSet(0, 2))); + Assert.assertEquals(2, sync(al.get()).intValue()); + } + + @Test + public void testSetThenIncrement() { + RAtomicLongReactive al = redisson.getAtomicLong("test"); + sync(al.set(2)); + Assert.assertEquals(2, sync(al.getAndIncrement()).intValue()); + Assert.assertEquals(3, sync(al.get()).intValue()); + } + + @Test + public void testIncrementAndGet() { + RAtomicLongReactive al = redisson.getAtomicLong("test"); + Assert.assertEquals(1, sync(al.incrementAndGet()).intValue()); + Assert.assertEquals(1, sync(al.get()).intValue()); + } + + @Test + public void testGetAndIncrement() { + RAtomicLongReactive al = redisson.getAtomicLong("test"); + Assert.assertEquals(0, sync(al.getAndIncrement()).intValue()); + Assert.assertEquals(1, sync(al.get()).intValue()); + } + + @Test + public void test() { + RAtomicLongReactive al = redisson.getAtomicLong("test"); + Assert.assertEquals(0, sync(al.get()).intValue()); + Assert.assertEquals(0, sync(al.getAndIncrement()).intValue()); + Assert.assertEquals(1, sync(al.get()).intValue()); + Assert.assertEquals(1, sync(al.getAndDecrement()).intValue()); + Assert.assertEquals(0, sync(al.get()).intValue()); + Assert.assertEquals(0, sync(al.getAndIncrement()).intValue()); + Assert.assertEquals(1, sync(al.getAndSet(12)).intValue()); + Assert.assertEquals(12, sync(al.get()).intValue()); + sync(al.set(1)); + + long state = sync(redisson.getAtomicLong("test").get()); + Assert.assertEquals(1, state); + sync(al.set(Long.MAX_VALUE - 1000)); + + long newState = sync(redisson.getAtomicLong("test").get()); + Assert.assertEquals(Long.MAX_VALUE - 1000, newState); + } + +}