Multi-thread lock fixed. #39

pull/38/merge
Nikita 11 years ago
parent f8d00c2842
commit 0cf87f2a75

@ -8,6 +8,7 @@ import static com.lambdaworks.redis.protocol.CommandType.SUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.UNSUBSCRIBE; import static com.lambdaworks.redis.protocol.CommandType.UNSUBSCRIBE;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.Collection; import java.util.Collection;
@ -88,8 +89,8 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels)); dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
} }
public void unsubscribe(String... channels) { public Future<V> unsubscribe(String... channels) {
dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels)); return dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
} }
@Override @Override

@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.core.RLock; import org.redisson.core.RLock;
@ -108,6 +110,7 @@ public class RedissonLock extends RedissonObject implements RLock {
private static final Integer unlockMessage = 0; private static final Integer unlockMessage = 0;
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = new ConcurrentHashMap<String, RedissonLockEntry>(); private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = new ConcurrentHashMap<String, RedissonLockEntry>();
RedissonLock(ConnectionManager connectionManager, String name, UUID id) { RedissonLock(ConnectionManager connectionManager, String name, UUID id) {
@ -119,18 +122,34 @@ public class RedissonLock extends RedissonObject implements RLock {
while (true) { while (true) {
RedissonLockEntry entry = ENTRIES.get(getEntryName()); RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) { if (entry == null) {
lock.readLock().unlock();
return; return;
} }
RedissonLockEntry newEntry = new RedissonLockEntry(entry); RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release(); newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) { if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (!newEntry.isFree()) {
lock.readLock().unlock();
return;
}
lock.readLock().unlock();
lock.writeLock().lock();
try {
if (ENTRIES.remove(getEntryName(), newEntry)) {
Future future = connectionManager.unsubscribe(getChannelName());
future.awaitUninterruptibly();
}
} finally {
lock.writeLock().unlock();
}
return; return;
} }
} }
} }
private String getEntryName() { private String getEntryName() {
return id + getName(); return getName();
} }
private Promise<Boolean> aquire() { private Promise<Boolean> aquire() {
@ -149,6 +168,7 @@ public class RedissonLock extends RedissonObject implements RLock {
} }
private Future<Boolean> subscribe() { private Future<Boolean> subscribe() {
lock.readLock().lock();
Promise<Boolean> promise = aquire(); Promise<Boolean> promise = aquire();
if (promise != null) { if (promise != null) {
return promise; return promise;
@ -161,7 +181,11 @@ public class RedissonLock extends RedissonObject implements RLock {
if (oldValue != null) { if (oldValue != null) {
Promise<Boolean> oldPromise = aquire(); Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) { if (oldPromise == null) {
return subscribe(); try {
return subscribe();
} finally {
lock.readLock().unlock();
}
} }
return oldPromise; return oldPromise;
} }
@ -189,19 +213,19 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.subscribe(listener, getChannelName()); connectionManager.subscribe(listener, getChannelName());
RedisPubSubAdapter<Object> expireListener = new RedisPubSubAdapter<Object>() { // RedisPubSubAdapter<Object> expireListener = new RedisPubSubAdapter<Object>() {
//
@Override // @Override
public void message(String channel, Object message) { // public void message(String channel, Object message) {
if (getExpireChannelName().equals(channel) // if (getExpireChannelName().equals(channel)
&& "expired".equals(message)) { // && "expired".equals(message)) {
forceUnlock(); // forceUnlock();
} // }
} // }
//
}; // };
//
connectionManager.subscribe(expireListener, getExpireChannelName()); // connectionManager.subscribe(expireListener, getExpireChannelName());
return newPromise; return newPromise;
} }
@ -240,12 +264,19 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override @Override
public void lockInterruptibly() throws InterruptedException { public void lockInterruptibly() throws InterruptedException {
while (!tryLock()) { Future<Boolean> promise = subscribe();
// waiting for message try {
RedissonLockEntry entry = ENTRIES.get(getEntryName()); promise.awaitUninterruptibly();
if (entry != null) {
entry.getLatch().acquire(); while (!tryLock()) {
// waiting for message
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.getLatch().acquire();
}
} }
} finally {
release();
} }
} }
@ -257,7 +288,7 @@ public class RedissonLock extends RedissonObject implements RLock {
return tryLockInner(); return tryLockInner();
} finally { } finally {
close(); release();
} }
} }
@ -306,7 +337,7 @@ public class RedissonLock extends RedissonObject implements RLock {
} }
return true; return true;
} finally { } finally {
close(); release();
} }
} }
@ -336,7 +367,7 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.releaseWrite(connection); connectionManager.releaseWrite(connection);
} }
} finally { } finally {
close(); release();
} }
} }
@ -375,7 +406,7 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.releaseWrite(connection); connectionManager.releaseWrite(connection);
} }
} finally { } finally {
close(); release();
} }
} }
@ -393,7 +424,7 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally { } finally {
close(); release();
} }
} }
@ -412,7 +443,7 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally { } finally {
close(); release();
} }
} }
@ -434,31 +465,15 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.releaseRead(connection); connectionManager.releaseRead(connection);
} }
} finally { } finally {
close(); release();
} }
} }
@Override @Override
public void delete() { public void delete() {
forceUnlock(); forceUnlock();
ENTRIES.remove(getEntryName()); lock.readLock().lock();
}
public void close() {
release(); release();
connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null
&& entry.isFree()
&& ENTRIES.remove(getEntryName(), entry)) {
connectionManager.unsubscribe(getChannelName());
// connectionManager.unsubscribe(getExpireChannelName());
}
}
}, 15, TimeUnit.SECONDS);
} }
} }

@ -16,6 +16,7 @@
package org.redisson.connection; package org.redisson.connection;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
@ -43,7 +44,7 @@ public interface ConnectionManager {
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName); <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName);
void unsubscribe(String channelName); Future unsubscribe(String channelName);
void releaseWrite(RedisConnection сonnection); void releaseWrite(RedisConnection сonnection);

@ -17,6 +17,7 @@ package org.redisson.connection;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import java.net.URI; import java.net.URI;
@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.RedisConnectionClosedException;
import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
@ -224,8 +226,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.release(); entry.release();
return oldEntry; return oldEntry;
} }
entry.subscribe(channelName);
return entry; synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName);
}
entry.subscribe(channelName);
return entry;
}
} }
} }
@ -238,8 +247,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
returnSubscribeConnection(entry); returnSubscribeConnection(entry);
return oldEntry; return oldEntry;
} }
entry.subscribe(channelName);
return entry; synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName);
}
entry.subscribe(channelName);
return entry;
}
} }
RedisPubSubConnection nextPubSubConnection() { RedisPubSubConnection nextPubSubConnection() {
@ -253,30 +269,42 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return сonnEntry; return сonnEntry;
} }
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values()); Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) { for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) { if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) { if (oldEntry != null) {
entry.release();
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(listener, channelName);
}
entry.subscribe(listener, channelName);
return entry;
}
}
}
RedisPubSubConnection<K, V> conn = nextPubSubConnection();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(entry);
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release(); entry.release();
return oldEntry; return subscribe(listener, channelName);
} }
entry.subscribe(listener, channelName); entry.subscribe(listener, channelName);
return entry; return entry;
} }
}
RedisPubSubConnection<K, V> conn = nextPubSubConnection();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
returnSubscribeConnection(entry);
return oldEntry;
}
entry.subscribe(listener, channelName);
return entry;
} }
void acquireMasterConnection() { void acquireMasterConnection() {
@ -290,16 +318,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
@Override @Override
public void unsubscribe(String channelName) { public Future unsubscribe(String channelName) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) { if (entry == null) {
return; return group.next().newSucceededFuture(null);
} }
entry.unsubscribe(channelName); Future future = entry.unsubscribe(channelName);
if (entry.tryClose()) { future.addListener(new FutureListener() {
returnSubscribeConnection(entry); @Override
} public void operationComplete(Future future) throws Exception {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(entry);
}
}
}
});
return future;
} }
protected void returnSubscribeConnection(PubSubConnectionEntry entry) { protected void returnSubscribeConnection(PubSubConnectionEntry entry) {

@ -15,6 +15,9 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Queue; import java.util.Queue;
@ -26,6 +29,7 @@ import java.util.concurrent.Semaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisConnectionClosedException;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener; import com.lambdaworks.redis.pubsub.RedisPubSubListener;
@ -132,18 +136,31 @@ public class PubSubConnectionEntry {
public void subscribe(RedisPubSubAdapter listener, String channel) { public void subscribe(RedisPubSubAdapter listener, String channel) {
conn.addListener(listener); addListener(channel, listener);
conn.subscribe(channel); conn.subscribe(channel);
} }
public void unsubscribe(String channel) { public Future unsubscribe(final String channel) {
conn.unsubscribe(channel); Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
subscribedChannelsAmount.release(); if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
Future future = conn.unsubscribe(channel);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
subscribedChannelsAmount.release();
}
});
return future;
} }
public boolean tryClose() { public boolean tryClose() {
if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) { if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) {
conn.close(); close();
return true; return true;
} }
return false; return false;

@ -206,6 +206,11 @@ public class RedissonLockTest extends BaseConcurrentTest {
public void run(Redisson redisson) { public void run(Redisson redisson) {
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
redisson.getLock("testConcurrency_MultiInstance1").lock(); redisson.getLock("testConcurrency_MultiInstance1").lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.set(lockedCounter.get() + 1); lockedCounter.set(lockedCounter.get() + 1);
redisson.getLock("testConcurrency_MultiInstance1").unlock(); redisson.getLock("testConcurrency_MultiInstance1").unlock();
} }

Loading…
Cancel
Save