From 0513bb2ea228d9cf7b1f93160aeae25089184d43 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 6 Jan 2014 13:40:16 +0400 Subject: [PATCH] RObject introduced --- src/main/java/org/redisson/Redisson.java | 22 ++++++++-- src/main/java/org/redisson/RedissonLock.java | 41 ++++++++++++++----- src/main/java/org/redisson/core/RObject.java | 33 +++++++++++++++ .../misc/internal/ThreadLocalSemaphore.java | 15 +++++++ .../java/org/redisson/RedissonLockTest.java | 21 ++++++++++ 5 files changed, 117 insertions(+), 15 deletions(-) create mode 100644 src/main/java/org/redisson/core/RObject.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index bf1f363d6..10a0c7bab 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -24,16 +24,19 @@ import java.util.concurrent.locks.Lock; import org.redisson.core.RAtomicLong; import org.redisson.core.RCountDownLatch; +import org.redisson.core.RObject; import org.redisson.core.RTopic; +import com.lambdaworks.redis.RedisAsyncConnection; import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.codec.JsonCodec; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; +// TODO lazy connection public class Redisson { - // TODO drain after some time + // TODO drain by weak reference private final ConcurrentMap latchesMap = new ConcurrentHashMap(); private final ConcurrentMap atomicLongsMap = new ConcurrentHashMap(); private final ConcurrentMap queuesMap = new ConcurrentHashMap(); @@ -101,7 +104,7 @@ public class Redisson { RedisConnection connection = connect(); RedisPubSubConnection pubSubConnection = connectPubSub(); - lock = new RedissonLock(pubSubConnection, connection, name); + lock = new RedissonLock(this, pubSubConnection, connection, name); RedissonLock oldLock = locksMap.putIfAbsent(name, lock); if (oldLock != null) { connection.close(); @@ -209,8 +212,14 @@ public class Redisson { return redisClient.connectPubSub(codec); } - public void getSemaphore() { - // TODO implement + // TODO implement +// public void getSemaphore() { +// } + + void remove(RObject robject) { + if (robject instanceof RedissonLock) { + locksMap.remove(robject.getName()); + } } public void shutdown() { @@ -220,5 +229,10 @@ public class Redisson { RedisConnection connect() { return redisClient.connect(codec); } + + RedisAsyncConnection connectAsync() { + return redisClient.connectAsync(codec); + } + } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 1a70bd2f4..68347f4de 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -22,26 +22,31 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import org.redisson.core.RObject; + import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; -public class RedissonLock implements Lock { +// TODO make it reentrant +public class RedissonLock implements Lock, RObject { - private final CountDownLatch subscribeLatch = new CountDownLatch(1); + private final Redisson redisson; private final RedisPubSubConnection pubSubConnection; private final RedisConnection connection; - private final String groupName = "redisson_lock_"; + private final String groupName = "redisson_lock"; private final String name; private static final Integer unlockMessage = 0; + private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final AtomicBoolean subscribeOnce = new AtomicBoolean(); private final Semaphore msg = new Semaphore(1); - RedissonLock(RedisPubSubConnection pubSubConnection, RedisConnection connection, String name) { + RedissonLock(Redisson redisson, RedisPubSubConnection pubSubConnection, RedisConnection connection, String name) { + this.redisson = redisson; this.pubSubConnection = pubSubConnection; this.connection = connection; this.name = name; @@ -49,6 +54,8 @@ public class RedissonLock implements Lock { public void subscribe() { if (subscribeOnce.compareAndSet(false, true)) { + msg.acquireUninterruptibly(); + RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override @@ -107,17 +114,14 @@ public class RedissonLock implements Lock { public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { time = unit.toMillis(time); while (!tryLock()) { + if (time <= 0) { + return false; + } long current = System.currentTimeMillis(); // waiting for message - boolean res = msg.tryAcquire(time, TimeUnit.MILLISECONDS); - if (res) { - return true; - } + msg.tryAcquire(time, TimeUnit.MILLISECONDS); long elapsed = System.currentTimeMillis() - current; time -= elapsed; - if (time <= 0) { - return false; - } } return true; } @@ -133,4 +137,19 @@ public class RedissonLock implements Lock { throw new UnsupportedOperationException(); } + @Override + public String getName() { + return name; + } + + @Override + public void destroy() { + pubSubConnection.unsubscribe(getChannelName()); + + connection.close(); + pubSubConnection.close(); + + redisson.remove(this); + } + } diff --git a/src/main/java/org/redisson/core/RObject.java b/src/main/java/org/redisson/core/RObject.java new file mode 100644 index 000000000..ac0585856 --- /dev/null +++ b/src/main/java/org/redisson/core/RObject.java @@ -0,0 +1,33 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface RObject { + + /** + * Returns name of distributed object + * + * @return name + */ + String getName(); + + /** + * Closes connections and releases resources engaged by this object + * + */ + void destroy(); + +} diff --git a/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java b/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java index a6b90efe1..bddfdeceb 100644 --- a/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java +++ b/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.misc.internal; import java.util.Collection; diff --git a/src/test/java/org/redisson/RedissonLockTest.java b/src/test/java/org/redisson/RedissonLockTest.java index 638431a26..8c2eeb45d 100644 --- a/src/test/java/org/redisson/RedissonLockTest.java +++ b/src/test/java/org/redisson/RedissonLockTest.java @@ -17,6 +17,8 @@ public class RedissonLockTest extends BaseConcurrentTest { lock.lock(); lock.unlock(); + + redisson.shutdown(); } @Test @@ -37,6 +39,25 @@ public class RedissonLockTest extends BaseConcurrentTest { Assert.assertEquals(iterations, lockedCounter.get()); } + @Test + public void testConcurrencyLoop_MultiInstance() throws InterruptedException { + final int iterations = 100; + final AtomicInteger lockedCounter = new AtomicInteger(); + + testMultiInstanceConcurrency(16, new RedissonRunnable() { + @Override + public void run(Redisson redisson) { + for (int i = 0; i < iterations; i++) { + redisson.getLock("testConcurrency_MultiInstance").lock(); + lockedCounter.set(lockedCounter.get() + 1); + redisson.getLock("testConcurrency_MultiInstance").unlock(); + } + } + }); + + Assert.assertEquals(16 * iterations, lockedCounter.get()); + } + @Test public void testConcurrency_MultiInstance() throws InterruptedException { int iterations = 100;