RedissonAtomicLong added

pull/6/head
Nikita 11 years ago
parent 5646974fad
commit 5ec0d29705

@ -16,11 +16,13 @@
package org.redisson; package org.redisson;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RTopic; import org.redisson.core.RTopic;
import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisClient;
@ -31,6 +33,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class Redisson { public class Redisson {
// TODO drain after some time // TODO drain after some time
private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ConcurrentHashMap<String, RedissonAtomicLong>();
private final ConcurrentMap<String, RedissonQueue> queuesMap = new ConcurrentHashMap<String, RedissonQueue>(); private final ConcurrentMap<String, RedissonQueue> queuesMap = new ConcurrentHashMap<String, RedissonQueue>();
private final ConcurrentMap<String, RedissonTopic> topicsMap = new ConcurrentHashMap<String, RedissonTopic>(); private final ConcurrentMap<String, RedissonTopic> topicsMap = new ConcurrentHashMap<String, RedissonTopic>();
private final ConcurrentMap<String, RedissonSet> setsMap = new ConcurrentHashMap<String, RedissonSet>(); private final ConcurrentMap<String, RedissonSet> setsMap = new ConcurrentHashMap<String, RedissonSet>();
@ -147,7 +150,7 @@ public class Redisson {
} }
public <V> RedissonQueue<V> getQueue(String name) { public <V> Queue<V> getQueue(String name) {
RedissonQueue<V> queue = queuesMap.get(name); RedissonQueue<V> queue = queuesMap.get(name);
if (queue == null) { if (queue == null) {
RedisConnection<Object, Object> connection = connect(); RedisConnection<Object, Object> connection = connect();
@ -163,7 +166,20 @@ public class Redisson {
return queue; return queue;
} }
public void getAtomicLong() { public RAtomicLong getAtomicLong(String name) {
RedissonAtomicLong atomicLong = atomicLongsMap.get(name);
if (atomicLong == null) {
RedisConnection<Object, Object> connection = connect();
atomicLong = new RedissonAtomicLong(this, connection, name);
RedissonAtomicLong oldAtomicLong = atomicLongsMap.putIfAbsent(name, atomicLong);
if (oldAtomicLong != null) {
connection.close();
atomicLong = oldAtomicLong;
}
}
return atomicLong;
} }

@ -0,0 +1,110 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.core.RAtomicLong;
import com.lambdaworks.redis.RedisConnection;
public class RedissonAtomicLong implements RAtomicLong {
private final RedisConnection<Object, Object> connection;
private final String name;
private final Redisson redisson;
RedissonAtomicLong(Redisson redisson, RedisConnection<Object, Object> connection, String name) {
this.redisson = redisson;
this.connection = connection;
this.name = name;
}
@Override
public long addAndGet(long delta) {
return connection.incrby(name, delta);
}
@Override
public boolean compareAndSet(long expect, long update) {
RedisConnection<Object, Object> conn = redisson.connect();
try {
while (true) {
conn.watch(name);
Long value = (Long) conn.get(name);
if (value != expect) {
conn.discard();
return false;
}
conn.multi();
conn.set(name, update);
if (conn.exec().size() == 1) {
return true;
}
}
} finally {
conn.close();
}
}
@Override
public long decrementAndGet() {
return connection.decr(name);
}
@Override
public long get() {
return (Long) connection.get(name);
}
@Override
public long getAndAdd(long delta) {
while (true) {
long current = get();
long next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
@Override
public long getAndSet(long newValue) {
return (Long) connection.getset(name, newValue);
}
@Override
public long incrementAndGet() {
return connection.incr(name);
}
@Override
public long getAndIncrement() {
return getAndAdd(1);
}
public long getAndDecrement() {
return getAndAdd(-1);
}
@Override
public void set(long newValue) {
connection.set(name, newValue);
}
public String toString() {
return Long.toString(get());
}
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson; package org.redisson;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;

@ -0,0 +1,92 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public interface RAtomicLong {
long getAndDecrement();
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
long addAndGet(long delta);
/**
* Atomically sets the value to the given updated value
* only if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful; or false if the actual value
* was not equal to the expected value.
*/
boolean compareAndSet(long expect, long update);
/**
* Atomically decrements the current value by one.
*
* @return the updated value
*/
long decrementAndGet();
/**
* Gets the current value.
*
* @return the current value
*/
long get();
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the old value before the add
*/
long getAndAdd(long delta);
/**
* Atomically sets the given value and returns the old value.
*
* @param newValue the new value
* @return the old value
*/
long getAndSet(long newValue);
/**
* Atomically increments the current value by one.
*
* @return the updated value
*/
long incrementAndGet();
/**
* Atomically increments the current value by one.
*
* @return the old value
*/
long getAndIncrement();
/**
* Atomically sets the given value.
*
* @param newValue the new value
*/
void set(long newValue);
}
Loading…
Cancel
Save