diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index d0c30def8..c4df8c661 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -16,11 +16,13 @@ package org.redisson; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; +import org.redisson.core.RAtomicLong; import org.redisson.core.RTopic; import com.lambdaworks.redis.RedisClient; @@ -31,6 +33,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public class Redisson { // TODO drain after some time + private final ConcurrentMap atomicLongsMap = new ConcurrentHashMap(); private final ConcurrentMap queuesMap = new ConcurrentHashMap(); private final ConcurrentMap topicsMap = new ConcurrentHashMap(); private final ConcurrentMap setsMap = new ConcurrentHashMap(); @@ -147,7 +150,7 @@ public class Redisson { } - public RedissonQueue getQueue(String name) { + public Queue getQueue(String name) { RedissonQueue queue = queuesMap.get(name); if (queue == null) { RedisConnection connection = connect(); @@ -163,7 +166,20 @@ public class Redisson { return queue; } - public void getAtomicLong() { + public RAtomicLong getAtomicLong(String name) { + RedissonAtomicLong atomicLong = atomicLongsMap.get(name); + if (atomicLong == null) { + RedisConnection connection = connect(); + atomicLong = new RedissonAtomicLong(this, connection, name); + RedissonAtomicLong oldAtomicLong = atomicLongsMap.putIfAbsent(name, atomicLong); + if (oldAtomicLong != null) { + connection.close(); + + atomicLong = oldAtomicLong; + } + } + + return atomicLong; } diff --git a/src/main/java/org/redisson/RedissonAtomicLong.java b/src/main/java/org/redisson/RedissonAtomicLong.java new file mode 100644 index 000000000..be06e1ad8 --- /dev/null +++ b/src/main/java/org/redisson/RedissonAtomicLong.java @@ -0,0 +1,110 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.core.RAtomicLong; + +import com.lambdaworks.redis.RedisConnection; + +public class RedissonAtomicLong implements RAtomicLong { + + private final RedisConnection connection; + private final String name; + private final Redisson redisson; + + RedissonAtomicLong(Redisson redisson, RedisConnection connection, String name) { + this.redisson = redisson; + this.connection = connection; + this.name = name; + } + + + @Override + public long addAndGet(long delta) { + return connection.incrby(name, delta); + } + + @Override + public boolean compareAndSet(long expect, long update) { + RedisConnection conn = redisson.connect(); + try { + while (true) { + conn.watch(name); + Long value = (Long) conn.get(name); + if (value != expect) { + conn.discard(); + return false; + } + conn.multi(); + conn.set(name, update); + if (conn.exec().size() == 1) { + return true; + } + } + } finally { + conn.close(); + } + } + + @Override + public long decrementAndGet() { + return connection.decr(name); + } + + @Override + public long get() { + return (Long) connection.get(name); + } + + @Override + public long getAndAdd(long delta) { + while (true) { + long current = get(); + long next = current + delta; + if (compareAndSet(current, next)) + return current; + } + } + + @Override + public long getAndSet(long newValue) { + return (Long) connection.getset(name, newValue); + } + + @Override + public long incrementAndGet() { + return connection.incr(name); + } + + @Override + public long getAndIncrement() { + return getAndAdd(1); + } + + public long getAndDecrement() { + return getAndAdd(-1); + } + + @Override + public void set(long newValue) { + connection.set(name, newValue); + } + + public String toString() { + return Long.toString(get()); + } + +} diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index cc4e030ba..4bdff8c27 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson; import java.util.NoSuchElementException; diff --git a/src/main/java/org/redisson/core/RAtomicLong.java b/src/main/java/org/redisson/core/RAtomicLong.java new file mode 100644 index 000000000..af99add24 --- /dev/null +++ b/src/main/java/org/redisson/core/RAtomicLong.java @@ -0,0 +1,92 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface RAtomicLong { + + long getAndDecrement(); + + /** + * Atomically adds the given value to the current value. + * + * @param delta the value to add + * @return the updated value + */ + long addAndGet(long delta); + + /** + * Atomically sets the value to the given updated value + * only if the current value {@code ==} the expected value. + * + * @param expect the expected value + * @param update the new value + * @return true if successful; or false if the actual value + * was not equal to the expected value. + */ + boolean compareAndSet(long expect, long update); + + /** + * Atomically decrements the current value by one. + * + * @return the updated value + */ + long decrementAndGet(); + + /** + * Gets the current value. + * + * @return the current value + */ + long get(); + + /** + * Atomically adds the given value to the current value. + * + * @param delta the value to add + * @return the old value before the add + */ + long getAndAdd(long delta); + + /** + * Atomically sets the given value and returns the old value. + * + * @param newValue the new value + * @return the old value + */ + long getAndSet(long newValue); + + /** + * Atomically increments the current value by one. + * + * @return the updated value + */ + long incrementAndGet(); + + /** + * Atomically increments the current value by one. + * + * @return the old value + */ + long getAndIncrement(); + + /** + * Atomically sets the given value. + * + * @param newValue the new value + */ + void set(long newValue); + +}