RBucket added

pull/25/head
Nikita 11 years ago
parent e6f8c3cac0
commit 00d8cbe945

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
@ -57,7 +58,7 @@ public class Redisson {
};
private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ReferenceMap<String, RedissonCountDownLatch>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonTopic> topicsMap = new ReferenceMap<String, RedissonTopic>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonTopic> topicsMap = new ReferenceMap<String, RedissonTopic>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonLock> locksMap = new ReferenceMap<String, RedissonLock>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ReferenceMap<String, RedissonAtomicLong>(ReferenceType.STRONG, ReferenceType.SOFT);
@ -67,6 +68,7 @@ public class Redisson {
private final ConcurrentMap<String, RedissonSortedSet> sortedSetMap = new ReferenceMap<String, RedissonSortedSet>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonList> listsMap = new ReferenceMap<String, RedissonList>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonHyperLogLog> hyperLogLogMap = new ReferenceMap<String, RedissonHyperLogLog>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonBucket> bucketMap = new ReferenceMap<String, RedissonBucket>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonMap> mapsMap = new ReferenceMap<String, RedissonMap>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConnectionManager connectionManager;
@ -101,6 +103,19 @@ public class Redisson {
return new Redisson(config);
}
public <V> RBucket<V> getBucket(String name) {
RedissonBucket<V> bucket = bucketMap.get(name);
if (bucket == null) {
bucket = new RedissonBucket<V>(connectionManager, name);
RedissonBucket<V> oldBucket = bucketMap.putIfAbsent(name, bucket);
if (oldBucket != null) {
bucket = oldBucket;
}
}
return bucket;
}
public <V> RHyperLogLog<V> getHyperLogLog(String name) {
RedissonHyperLogLog<V> logLog = hyperLogLogMap.get(name);
if (logLog == null) {

@ -0,0 +1,107 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RBucket;
import com.lambdaworks.redis.RedisConnection;
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
RedissonBucket(ConnectionManager connectionManager, String name) {
super(connectionManager, name);
}
@Override
public V get() {
RedisConnection<String, V> conn = connectionManager.connectionReadOp();
try {
return conn.get(getName());
} finally {
connectionManager.release(conn);
}
}
@Override
public Future<V> getAsync() {
RedisConnection<String, V> conn = connectionManager.connectionReadOp();
return conn.getAsync().get(getName()).addListener(connectionManager.createReleaseListener(conn));
}
@Override
public void set(V value) {
RedisConnection<String, V> conn = connectionManager.connectionWriteOp();
try {
conn.set(getName(), value);
} finally {
connectionManager.release(conn);
}
}
@Override
public Future<Void> setAsync(V value) {
RedisConnection<Object, V> connection = connectionManager.connectionWriteOp();
Promise<Void> promise = connectionManager.getGroup().next().newPromise();
Future<String> f = connection.getAsync().set(getName(), value);
addListener(f, promise);
promise.addListener(connectionManager.createReleaseListener(connection));
return promise;
}
@Override
public void set(V value, long timeToLive, TimeUnit timeUnit) {
RedisConnection<String, V> conn = connectionManager.connectionWriteOp();
try {
conn.setex(getName(), timeUnit.toSeconds(timeToLive), value);
} finally {
connectionManager.release(conn);
}
}
private void addListener(Future<String> future, final Promise<Void> promise) {
future.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (promise.isCancelled()) {
return;
}
if (future.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(promise.cause());
}
}
});
}
@Override
public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
RedisConnection<Object, V> connection = connectionManager.connectionWriteOp();
Promise<Void> promise = connectionManager.getGroup().next().newPromise();
Future<String> f = connection.getAsync().setex(getName(), timeUnit.toSeconds(timeToLive), value);
addListener(f, promise);
promise.addListener(connectionManager.createReleaseListener(connection));
return promise;
}
}

@ -286,7 +286,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> putAsync(K key, V value) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
RedisConnection<Object, V> connection = connectionManager.connectionWriteOp();
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
putAsync(key, value, promise, async);
@ -372,7 +372,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<V> removeAsync(K key) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
RedisConnection<Object, V> connection = connectionManager.connectionWriteOp();
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
removeAsync(key, promise, async);

@ -0,0 +1,36 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public interface RBucket<V> extends RObject {
V get();
Future<V> getAsync();
void set(V value);
Future<Void> setAsync(V value);
void set(V value, long timeToLive, TimeUnit timeUnit);
Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit);
}
Loading…
Cancel
Save