RObject introduced

pull/6/head
Nikita 11 years ago
parent 461a0128fb
commit 0513bb2ea2

@ -24,16 +24,19 @@ import java.util.concurrent.locks.Lock;
import org.redisson.core.RAtomicLong; import org.redisson.core.RAtomicLong;
import org.redisson.core.RCountDownLatch; import org.redisson.core.RCountDownLatch;
import org.redisson.core.RObject;
import org.redisson.core.RTopic; import org.redisson.core.RTopic;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.JsonCodec; import com.lambdaworks.redis.codec.JsonCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
// TODO lazy connection
public class Redisson { public class Redisson {
// TODO drain after some time // TODO drain by weak reference
private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ConcurrentHashMap<String, RedissonCountDownLatch>(); private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ConcurrentHashMap<String, RedissonCountDownLatch>();
private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ConcurrentHashMap<String, RedissonAtomicLong>(); private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ConcurrentHashMap<String, RedissonAtomicLong>();
private final ConcurrentMap<String, RedissonQueue> queuesMap = new ConcurrentHashMap<String, RedissonQueue>(); private final ConcurrentMap<String, RedissonQueue> queuesMap = new ConcurrentHashMap<String, RedissonQueue>();
@ -101,7 +104,7 @@ public class Redisson {
RedisConnection<Object, Object> connection = connect(); RedisConnection<Object, Object> connection = connect();
RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub(); RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub();
lock = new RedissonLock(pubSubConnection, connection, name); lock = new RedissonLock(this, pubSubConnection, connection, name);
RedissonLock oldLock = locksMap.putIfAbsent(name, lock); RedissonLock oldLock = locksMap.putIfAbsent(name, lock);
if (oldLock != null) { if (oldLock != null) {
connection.close(); connection.close();
@ -209,8 +212,14 @@ public class Redisson {
return redisClient.connectPubSub(codec); return redisClient.connectPubSub(codec);
} }
public void getSemaphore() { // TODO implement
// TODO implement // public void getSemaphore() {
// }
void remove(RObject robject) {
if (robject instanceof RedissonLock) {
locksMap.remove(robject.getName());
}
} }
public void shutdown() { public void shutdown() {
@ -220,5 +229,10 @@ public class Redisson {
RedisConnection<Object, Object> connect() { RedisConnection<Object, Object> connect() {
return redisClient.connect(codec); return redisClient.connect(codec);
} }
RedisAsyncConnection<Object, Object> connectAsync() {
return redisClient.connectAsync(codec);
}
} }

@ -22,26 +22,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.redisson.core.RObject;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class RedissonLock implements Lock { // TODO make it reentrant
public class RedissonLock implements Lock, RObject {
private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final Redisson redisson;
private final RedisPubSubConnection<Object, Object> pubSubConnection; private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection; private final RedisConnection<Object, Object> connection;
private final String groupName = "redisson_lock_"; private final String groupName = "redisson_lock";
private final String name; private final String name;
private static final Integer unlockMessage = 0; private static final Integer unlockMessage = 0;
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final AtomicBoolean subscribeOnce = new AtomicBoolean(); private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final Semaphore msg = new Semaphore(1); private final Semaphore msg = new Semaphore(1);
RedissonLock(RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, String name) { RedissonLock(Redisson redisson, RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, String name) {
this.redisson = redisson;
this.pubSubConnection = pubSubConnection; this.pubSubConnection = pubSubConnection;
this.connection = connection; this.connection = connection;
this.name = name; this.name = name;
@ -49,6 +54,8 @@ public class RedissonLock implements Lock {
public void subscribe() { public void subscribe() {
if (subscribeOnce.compareAndSet(false, true)) { if (subscribeOnce.compareAndSet(false, true)) {
msg.acquireUninterruptibly();
RedisPubSubAdapter<Object, Object> listener = new RedisPubSubAdapter<Object, Object>() { RedisPubSubAdapter<Object, Object> listener = new RedisPubSubAdapter<Object, Object>() {
@Override @Override
@ -107,17 +114,14 @@ public class RedissonLock implements Lock {
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
time = unit.toMillis(time); time = unit.toMillis(time);
while (!tryLock()) { while (!tryLock()) {
if (time <= 0) {
return false;
}
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
// waiting for message // waiting for message
boolean res = msg.tryAcquire(time, TimeUnit.MILLISECONDS); msg.tryAcquire(time, TimeUnit.MILLISECONDS);
if (res) {
return true;
}
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time -= elapsed; time -= elapsed;
if (time <= 0) {
return false;
}
} }
return true; return true;
} }
@ -133,4 +137,19 @@ public class RedissonLock implements Lock {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public String getName() {
return name;
}
@Override
public void destroy() {
pubSubConnection.unsubscribe(getChannelName());
connection.close();
pubSubConnection.close();
redisson.remove(this);
}
} }

@ -0,0 +1,33 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public interface RObject {
/**
* Returns name of distributed object
*
* @return name
*/
String getName();
/**
* Closes connections and releases resources engaged by this object
*
*/
void destroy();
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc.internal; package org.redisson.misc.internal;
import java.util.Collection; import java.util.Collection;

@ -17,6 +17,8 @@ public class RedissonLockTest extends BaseConcurrentTest {
lock.lock(); lock.lock();
lock.unlock(); lock.unlock();
redisson.shutdown();
} }
@Test @Test
@ -37,6 +39,25 @@ public class RedissonLockTest extends BaseConcurrentTest {
Assert.assertEquals(iterations, lockedCounter.get()); Assert.assertEquals(iterations, lockedCounter.get());
} }
@Test
public void testConcurrencyLoop_MultiInstance() throws InterruptedException {
final int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
testMultiInstanceConcurrency(16, new RedissonRunnable() {
@Override
public void run(Redisson redisson) {
for (int i = 0; i < iterations; i++) {
redisson.getLock("testConcurrency_MultiInstance").lock();
lockedCounter.set(lockedCounter.get() + 1);
redisson.getLock("testConcurrency_MultiInstance").unlock();
}
}
});
Assert.assertEquals(16 * iterations, lockedCounter.get());
}
@Test @Test
public void testConcurrency_MultiInstance() throws InterruptedException { public void testConcurrency_MultiInstance() throws InterruptedException {
int iterations = 100; int iterations = 100;

Loading…
Cancel
Save