From b7643513fba8f10c51d1d45970ab6727975b6b5d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 08:18:40 +0300 Subject: [PATCH] RAtomicDoubleReactive added --- .../org/redisson/RedissonAtomicDouble.java | 2 +- .../java/org/redisson/RedissonReactive.java | 7 + .../redisson/api/RAtomicDoubleReactive.java | 47 +++++++ .../org/redisson/api/RAtomicLongReactive.java | 5 + .../redisson/api/RedissonReactiveClient.java | 8 ++ .../RedissonAtomicDoubleReactive.java | 132 ++++++++++++++++++ .../reactive/RedissonAtomicLongReactive.java | 3 +- 7 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java index 23f55b45e..61ceba620 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java @@ -35,7 +35,7 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDouble { - protected RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) { + public RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index c36970b24..188fe7ff8 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -24,6 +24,7 @@ import org.redisson.api.ClusterNode; import org.redisson.api.MapOptions; import org.redisson.api.Node; import org.redisson.api.NodesGroup; +import org.redisson.api.RAtomicDoubleReactive; import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RBatchReactive; import org.redisson.api.RBitSetReactive; @@ -59,6 +60,7 @@ import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.pubsub.SemaphorePubSub; +import org.redisson.reactive.RedissonAtomicDoubleReactive; import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBitSetReactive; @@ -302,6 +304,11 @@ public class RedissonReactive implements RedissonReactiveClient { public RAtomicLongReactive getAtomicLong(String name) { return new RedissonAtomicLongReactive(commandExecutor, name); } + + @Override + public RAtomicDoubleReactive getAtomicDouble(String name) { + return new RedissonAtomicDoubleReactive(commandExecutor, name); + } @Override public RBitSetReactive getBitSet(String name) { diff --git a/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java new file mode 100644 index 000000000..d93891ae8 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RAtomicDoubleReactive extends RExpirableReactive { + + Publisher compareAndSet(double expect, double update); + + Publisher addAndGet(double delta); + + Publisher decrementAndGet(); + + Publisher get(); + + Publisher getAndAdd(double delta); + + Publisher getAndSet(double newValue); + + Publisher incrementAndGet(); + + Publisher getAndIncrement(); + + Publisher getAndDecrement(); + + Publisher set(double newValue); + +} diff --git a/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java b/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java index 4bd448439..788a1802d 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java @@ -17,6 +17,11 @@ package org.redisson.api; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + */ public interface RAtomicLongReactive extends RExpirableReactive { Publisher compareAndSet(long expect, long update); diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index c47209e08..ad745e952 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -469,6 +469,14 @@ public interface RedissonReactiveClient { */ RAtomicLongReactive getAtomicLong(String name); + /** + * Returns "atomic double" instance by name. + * + * @param name of the "atomic double" + * @return AtomicLong object + */ + RAtomicDoubleReactive getAtomicDouble(String name); + /** * Returns bitSet instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java new file mode 100644 index 000000000..ed5e31b4c --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java @@ -0,0 +1,132 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonAtomicDouble; +import org.redisson.api.RAtomicDoubleAsync; +import org.redisson.api.RAtomicDoubleReactive; +import org.redisson.api.RFuture; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} + * + * @author Nikita Koksharov + * + */ +public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive implements RAtomicDoubleReactive { + + private final RAtomicDoubleAsync instance; + + public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + instance = new RedissonAtomicDouble(commandExecutor, name); + } + + @Override + public Publisher addAndGet(final double delta) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAndGetAsync(delta); + } + }); + } + + @Override + public Publisher compareAndSet(final double expect, final double update) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.compareAndSetAsync(expect, update); + } + }); + } + + @Override + public Publisher decrementAndGet() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.decrementAndGetAsync(); + } + }); + } + + @Override + public Publisher get() { + return addAndGet(0); + } + + @Override + public Publisher getAndAdd(final double delta) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndAddAsync(delta); + } + }); + } + + + @Override + public Publisher getAndSet(final double newValue) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndSetAsync(newValue); + } + }); + } + + @Override + public Publisher incrementAndGet() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.incrementAndGetAsync(); + } + }); + } + + @Override + public Publisher getAndIncrement() { + return getAndAdd(1); + } + + @Override + public Publisher getAndDecrement() { + return getAndAdd(-1); + } + + @Override + public Publisher set(final double newValue) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.setAsync(newValue); + } + }); + } + + public String toString() { + return instance.toString(); + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java index b0c625881..8aaa70f8e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java @@ -23,7 +23,6 @@ import org.redisson.api.RFuture; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; -import reactor.rx.Streams; /** * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} @@ -127,7 +126,7 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem } public String toString() { - return Long.toString(Streams.create(get()).next().poll()); + return instance.toString(); } }