Useless Redisson.locksMap removed

pull/38/head
Nikita 11 years ago
parent 6250f41c55
commit eb3915f5bc

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
@ -34,9 +33,6 @@ import org.redisson.core.RQueue;
import org.redisson.core.RSet; import org.redisson.core.RSet;
import org.redisson.core.RSortedSet; import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic; import org.redisson.core.RTopic;
import org.redisson.misc.ReferenceMap;
import org.redisson.misc.ReferenceMap.ReferenceType;
import org.redisson.misc.ReferenceMap.RemoveValueListener;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
@ -49,19 +45,6 @@ import com.lambdaworks.redis.RedisConnection;
*/ */
public class Redisson { public class Redisson {
private final RemoveValueListener listener = new RemoveValueListener() {
@Override
public void onRemove(Object value) {
if (value instanceof RedissonObject) {
((RedissonObject)value).close();
}
}
};
private final ConcurrentMap<String, RedissonLock> locksMap = new ReferenceMap<String, RedissonLock>(ReferenceType.STRONG, ReferenceType.WEAK, listener);
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Config config; private final Config config;
@ -137,16 +120,7 @@ public class Redisson {
* @return distributed lock * @return distributed lock
*/ */
public RLock getLock(String name) { public RLock getLock(String name) {
RedissonLock lock = locksMap.get(name); return new RedissonLock(connectionManager, name, id);
if (lock == null) {
lock = new RedissonLock(connectionManager, name, id);
RedissonLock oldLock = locksMap.putIfAbsent(name, lock);
if (oldLock != null) {
lock = oldLock;
}
}
return lock;
} }
/** /**

@ -68,6 +68,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
if (oldPromise == null) { if (oldPromise == null) {
return subscribe(); return subscribe();
} }
return oldPromise;
} }
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() { RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@ -248,6 +249,29 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
} }
@Override
public void delete() {
Future<Boolean> promise = subscribe();
try {
promise.awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
try {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), zeroCountMessage);
if (connection.exec().size() != 2) {
throw new IllegalStateException();
}
} finally {
connectionManager.releaseWrite(connection);
}
} finally {
close();
ENTRIES.remove(getName());
}
}
public void close() { public void close() {
release(); release();

@ -20,9 +20,9 @@ import io.netty.util.concurrent.Promise;
import java.io.Serializable; import java.io.Serializable;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Semaphore; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -108,10 +108,7 @@ public class RedissonLock extends RedissonObject implements RLock {
private static final Integer unlockMessage = 0; private static final Integer unlockMessage = 0;
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>(); private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = new ConcurrentHashMap<String, RedissonLockEntry>();
// TODO use lazy init map
private final Semaphore msg = new Semaphore(1);
private PubSubConnectionEntry pubSubEntry; private PubSubConnectionEntry pubSubEntry;
@ -120,32 +117,65 @@ public class RedissonLock extends RedissonObject implements RLock {
this.id = id; this.id = id;
} }
private void release() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release();
if (ENTRIES.replace(getName(), entry, newEntry)) {
return;
}
}
}
private Promise<Boolean> aquire() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getName());
if (entry != null) {
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getName(), entry, newEntry)) {
return newEntry.getPromise();
}
} else {
return null;
}
}
}
private Future<Boolean> subscribe() { private Future<Boolean> subscribe() {
Promise<Boolean> p = promise.get(); Promise<Boolean> promise = aquire();
if (p != null) { if (promise != null) {
return p; return promise;
} }
final Promise<Boolean> newPromise = newPromise(); Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) { final RedissonLockEntry value = new RedissonLockEntry(newPromise);
return promise.get(); value.aquire();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
}
return oldPromise;
} }
msg.acquireUninterruptibly(); value.getLatch().acquireUninterruptibly();
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() { RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override @Override
public void subscribed(String channel, long count) { public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) { if (getChannelName().equals(channel)) {
newPromise.setSuccess(true); value.getPromise().setSuccess(true);
} }
} }
@Override @Override
public void message(String channel, Integer message) { public void message(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) { if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
msg.release(); value.getLatch().release();
} }
} }
@ -157,8 +187,6 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override @Override
public void lock() { public void lock() {
subscribe().awaitUninterruptibly();
try { try {
lockInterruptibly(); lockInterruptibly();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -177,17 +205,17 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override @Override
public void lockInterruptibly() throws InterruptedException { public void lockInterruptibly() throws InterruptedException {
subscribe().awaitUninterruptibly();
while (!tryLock()) { while (!tryLock()) {
// waiting for message // waiting for message
msg.acquire(); ENTRIES.get(getName()).getLatch().acquire();
} }
} }
@Override @Override
public boolean tryLock() { public boolean tryLock() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
promise.awaitUninterruptibly();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter(); currentLock.incCounter();
@ -207,11 +235,16 @@ public class RedissonLock extends RedissonObject implements RLock {
} finally { } finally {
connectionManager.releaseWrite(connection); connectionManager.releaseWrite(connection);
} }
} finally {
close();
}
} }
@Override @Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (!subscribe().awaitUninterruptibly(time, unit)) { Future<Boolean> promise = subscribe();
try {
if (!promise.awaitUninterruptibly(time, unit)) {
return false; return false;
} }
@ -222,22 +255,26 @@ public class RedissonLock extends RedissonObject implements RLock {
} }
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
// waiting for message // waiting for message
msg.tryAcquire(time, TimeUnit.MILLISECONDS); ENTRIES.get(getName()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time -= elapsed; time -= elapsed;
} }
return true; return true;
} finally {
close();
}
} }
@Override @Override
public void unlock() { public void unlock() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); promise.awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp(); RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try { try {
LockValue lock = (LockValue) connection.get(getKeyName()); LockValue lock = (LockValue) connection.get(getKeyName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock != null && lock.equals(currentLock)) { if (lock != null && lock.equals(currentLock)) {
if (lock.getCounter() > 1) { if (lock.getCounter() > 1) {
lock.decCounter(); lock.decCounter();
@ -246,12 +283,16 @@ public class RedissonLock extends RedissonObject implements RLock {
unlock(connection); unlock(connection);
} }
} else { } else {
throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: " // could be deleted
+ id + " thread-id: " + Thread.currentThread().getId()); // throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
// + id + " thread-id: " + Thread.currentThread().getId());
} }
} finally { } finally {
connectionManager.releaseWrite(connection); connectionManager.releaseWrite(connection);
} }
} finally {
close();
}
} }
private void unlock(RedisConnection<Object, Object> connection) { private void unlock(RedisConnection<Object, Object> connection) {
@ -275,31 +316,31 @@ public class RedissonLock extends RedissonObject implements RLock {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void close() {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
}
@Override @Override
public void forceUnlock() { public void forceUnlock() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
promise.awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp(); RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try { try {
while (true) {
LockValue lock = (LockValue) connection.get(getKeyName()); LockValue lock = (LockValue) connection.get(getKeyName());
if (lock != null) { if (lock != null) {
unlock(connection); unlock(connection);
} }
}
} finally { } finally {
connectionManager.releaseWrite(connection); connectionManager.releaseWrite(connection);
} }
} finally {
close();
}
} }
@Override @Override
public boolean isLocked() { public boolean isLocked() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
promise.awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp(); RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try { try {
@ -308,32 +349,40 @@ public class RedissonLock extends RedissonObject implements RLock {
} finally { } finally {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally {
close();
}
} }
@Override @Override
public boolean isHeldByCurrentThread() { public boolean isHeldByCurrentThread() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); promise.awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp(); RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try { try {
LockValue lock = (LockValue) connection.get(getKeyName()); LockValue lock = (LockValue) connection.get(getKeyName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
return lock != null && lock.equals(currentLock); return lock != null && lock.equals(currentLock);
} finally { } finally {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally {
close();
}
} }
@Override @Override
public int getHoldCount() { public int getHoldCount() {
subscribe().awaitUninterruptibly(); Future<Boolean> promise = subscribe();
try {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); promise.awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp(); RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try { try {
LockValue lock = (LockValue) connection.get(getKeyName()); LockValue lock = (LockValue) connection.get(getKeyName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock != null && lock.equals(currentLock)) { if (lock != null && lock.equals(currentLock)) {
return lock.getCounter(); return lock.getCounter();
} }
@ -341,11 +390,30 @@ public class RedissonLock extends RedissonObject implements RLock {
} finally { } finally {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally {
close();
}
} }
@Override @Override
public void delete() { public void delete() {
super.delete(getKeyName()); forceUnlock();
ENTRIES.remove(getName());
}
public void close() {
release();
connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
RedissonLockEntry entry = ENTRIES.get(getName());
if (entry.isFree()
&& ENTRIES.remove(getName(), entry)) {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
}
}
}, 15, TimeUnit.SECONDS);
} }
} }

@ -0,0 +1,83 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.Semaphore;
public class RedissonLockEntry {
private int counter;
private final Semaphore latch;
private final Promise<Boolean> promise;
public RedissonLockEntry(RedissonLockEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}
public RedissonLockEntry(Promise<Boolean> promise) {
super();
this.latch = new Semaphore(1);
this.promise = promise;
}
public boolean isFree() {
return counter == 0;
}
public void aquire() {
counter++;
}
public void release() {
counter--;
}
public Promise<Boolean> getPromise() {
return promise;
}
public Semaphore getLatch() {
return latch;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedissonLockEntry other = (RedissonLockEntry) obj;
if (counter != other.counter)
return false;
return true;
}
}

@ -151,7 +151,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
public void testConcurrency_SingleInstance() throws InterruptedException { public void testConcurrency_SingleInstance() throws InterruptedException {
final AtomicInteger lockedCounter = new AtomicInteger(); final AtomicInteger lockedCounter = new AtomicInteger();
int iterations = 100; int iterations = 15;
testSingleInstanceConcurrency(iterations, new RedissonRunnable() { testSingleInstanceConcurrency(iterations, new RedissonRunnable() {
@Override @Override
public void run(Redisson redisson) { public void run(Redisson redisson) {
@ -174,9 +174,9 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Override @Override
public void run(Redisson redisson) { public void run(Redisson redisson) {
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
redisson.getLock("testConcurrency_MultiInstance").lock(); redisson.getLock("testConcurrency_MultiInstance1").lock();
lockedCounter.set(lockedCounter.get() + 1); lockedCounter.set(lockedCounter.get() + 1);
redisson.getLock("testConcurrency_MultiInstance").unlock(); redisson.getLock("testConcurrency_MultiInstance1").unlock();
} }
} }
}); });
@ -192,7 +192,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
testMultiInstanceConcurrency(iterations, new RedissonRunnable() { testMultiInstanceConcurrency(iterations, new RedissonRunnable() {
@Override @Override
public void run(Redisson redisson) { public void run(Redisson redisson) {
Lock lock = redisson.getLock("testConcurrency_MultiInstance"); Lock lock = redisson.getLock("testConcurrency_MultiInstance2");
lock.lock(); lock.lock();
lockedCounter.set(lockedCounter.get() + 1); lockedCounter.set(lockedCounter.get() + 1);
lock.unlock(); lock.unlock();

Loading…
Cancel
Save