RAtomicLongReactive added. #210

pull/337/head
Nikita 9 years ago
parent b480ea79f0
commit fcbdfd0dea

@ -0,0 +1,109 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Collections;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.core.RAtomicLongReactive;
import reactor.rx.Streams;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
*
* @author Nikita Koksharov
*
*/
public class RedissonAtomicLongReactive extends RedissonExpirableReactive implements RAtomicLongReactive {
protected RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override
public Publisher<Long> addAndGet(long delta) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta);
}
@Override
public Publisher<Boolean> compareAndSet(long expect, long update) {
return commandExecutor.evalWriteObservable(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return true "
+ "else "
+ "return false end",
Collections.<Object>singletonList(getName()), expect, update);
}
@Override
public Publisher<Long> decrementAndGet() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
}
@Override
public Publisher<Long> get() {
return addAndGet(0);
}
@Override
public Publisher<Long> getAndAdd(long delta) {
return commandExecutor.evalWriteObservable(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]) or 0; "
+ "redis.call('set', KEYS[1], v + ARGV[1]); "
+ "return tonumber(v)",
Collections.<Object>singletonList(getName()), delta);
}
@Override
public Publisher<Long> getAndSet(long newValue) {
return commandExecutor.evalWriteObservable(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)",
Collections.<Object>singletonList(getName()), newValue);
}
@Override
public Publisher<Long> incrementAndGet() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName());
}
@Override
public Publisher<Long> getAndIncrement() {
return getAndAdd(1);
}
@Override
public Publisher<Long> getAndDecrement() {
return getAndAdd(-1);
}
@Override
public Publisher<Void> set(long newValue) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue);
}
public String toString() {
return Long.toString(Streams.create(get()).next().poll());
}
}

@ -29,6 +29,7 @@ import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
@ -214,6 +215,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonDequeReactive<V>(codec, commandExecutor, name);
}
@Override
public RAtomicLongReactive getAtomicLong(String name) {
return new RedissonAtomicLongReactive(commandExecutor, name);
}
public Config getConfig() {
return config;
}

@ -18,6 +18,7 @@ package org.redisson;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
@ -165,14 +166,14 @@ public interface RedissonReactiveClient {
<V> RDequeReactive<V> getDeque(String name, Codec codec);
// /**
// * Returns "atomic long" instance by name.
// *
// * @param name of the "atomic long"
// * @return
// */
// RAtomicLong getAtomicLong(String name);
//
/**
* Returns "atomic long" instance by name.
*
* @param name of the "atomic long"
* @return
*/
RAtomicLongReactive getAtomicLong(String name);
// /**
// * Returns "count down latch" instance by name.
// *

@ -0,0 +1,42 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import org.reactivestreams.Publisher;
public interface RAtomicLongReactive extends RExpirableReactive {
Publisher<Boolean> compareAndSet(long expect, long update);
Publisher<Long> addAndGet(long delta);
Publisher<Long> decrementAndGet();
Publisher<Long> get();
Publisher<Long> getAndAdd(long delta);
Publisher<Long> getAndSet(long newValue);
Publisher<Long> incrementAndGet();
Publisher<Long> getAndIncrement();
Publisher<Long> getAndDecrement();
Publisher<Void> set(long newValue);
}

@ -0,0 +1,61 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RAtomicLongReactive;
public class RedissonAtomicLongReactiveTest extends BaseReactiveTest {
@Test
public void testCompareAndSet() {
RAtomicLongReactive al = redisson.getAtomicLong("test");
Assert.assertFalse(sync(al.compareAndSet(-1, 2)));
Assert.assertEquals(0, sync(al.get()).intValue());
Assert.assertTrue(sync(al.compareAndSet(0, 2)));
Assert.assertEquals(2, sync(al.get()).intValue());
}
@Test
public void testSetThenIncrement() {
RAtomicLongReactive al = redisson.getAtomicLong("test");
sync(al.set(2));
Assert.assertEquals(2, sync(al.getAndIncrement()).intValue());
Assert.assertEquals(3, sync(al.get()).intValue());
}
@Test
public void testIncrementAndGet() {
RAtomicLongReactive al = redisson.getAtomicLong("test");
Assert.assertEquals(1, sync(al.incrementAndGet()).intValue());
Assert.assertEquals(1, sync(al.get()).intValue());
}
@Test
public void testGetAndIncrement() {
RAtomicLongReactive al = redisson.getAtomicLong("test");
Assert.assertEquals(0, sync(al.getAndIncrement()).intValue());
Assert.assertEquals(1, sync(al.get()).intValue());
}
@Test
public void test() {
RAtomicLongReactive al = redisson.getAtomicLong("test");
Assert.assertEquals(0, sync(al.get()).intValue());
Assert.assertEquals(0, sync(al.getAndIncrement()).intValue());
Assert.assertEquals(1, sync(al.get()).intValue());
Assert.assertEquals(1, sync(al.getAndDecrement()).intValue());
Assert.assertEquals(0, sync(al.get()).intValue());
Assert.assertEquals(0, sync(al.getAndIncrement()).intValue());
Assert.assertEquals(1, sync(al.getAndSet(12)).intValue());
Assert.assertEquals(12, sync(al.get()).intValue());
sync(al.set(1));
long state = sync(redisson.getAtomicLong("test").get());
Assert.assertEquals(1, state);
sync(al.set(Long.MAX_VALUE - 1000));
long newState = sync(redisson.getAtomicLong("test").get());
Assert.assertEquals(Long.MAX_VALUE - 1000, newState);
}
}
Loading…
Cancel
Save