RCountDownLatch and RLock refactoring.

pull/337/head
Nikita 9 years ago
parent d84b98627d
commit 2436dcc52f

@ -0,0 +1,13 @@
package org.redisson;
import io.netty.util.concurrent.Promise;
public interface PubSubEntry<E> {
void aquire();
int release();
Promise<E> getPromise();
}

@ -15,22 +15,17 @@
*/
package org.redisson;
import java.util.Collections;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RCountDownLatch;
import org.redisson.pubsub.CountDownLatchPubSub;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
* Distributed alternative to the {@link java.util.concurrent.CountDownLatch}
@ -43,10 +38,10 @@ import io.netty.util.internal.PlatformDependent;
*/
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
private static final Long zeroCountMessage = 0L;
private static final Long newCountMessage = 1L;
public static final Long zeroCountMessage = 0L;
public static final Long newCountMessage = 1L;
private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();
private static final CountDownLatchPubSub PUBSUB = new CountDownLatchPubSub();
private final UUID id;
@ -55,72 +50,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
this.id = id;
}
private Future<RedissonCountDownLatchEntry> subscribe() {
synchronized (ENTRIES) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
Promise<RedissonCountDownLatchEntry> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
RedisPubSubListener<Long> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise;
}
}
private RedisPubSubListener<Long> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
@Override
public void onMessage(String channel, Long message) {
if (!getChannelName().equals(channel)) {
return;
}
if (message.equals(zeroCountMessage)) {
value.getLatch().open();
}
if (message.equals(newCountMessage)) {
value.getLatch().close();
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName())
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}
};
return listener;
}
private void unsubscribe(RedissonCountDownLatchEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
}
public void await() throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
@ -128,17 +57,16 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
while (getCountInner() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await();
}
}
} finally {
unsubscribe(promise.getNow());
unsubscribe(promise);
}
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
@ -154,7 +82,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
long current = System.currentTimeMillis();
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await(time, TimeUnit.MILLISECONDS);
}
@ -165,18 +93,30 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return true;
} finally {
unsubscribe(promise.getNow());
unsubscribe(promise);
}
}
private RedissonCountDownLatchEntry getEntry() {
return PUBSUB.getEntry(getEntryName());
}
private Future<RedissonCountDownLatchEntry> subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
private void unsubscribe(Future<RedissonCountDownLatchEntry> future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
@Override
public void countDown() {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" +
"return true",
Collections.<Object>singletonList(getName()), zeroCountMessage, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
get(f);
}
@ -185,7 +125,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
private String getChannelName() {
return "redisson_countdownlatch_{" + getName() + "}";
return "redisson_countdownlatch__channel__{" + getName() + "}";
}
@Override
@ -204,28 +144,28 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean trySetCount(long count) {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', ARGV[3], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "else "
+ "return false "
+ "end",
Collections.<Object>singletonList(getName()), newCountMessage, count, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
return get(f);
}
@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', ARGV[2], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "else "
+ "return false "
+ "end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage);
}
}

@ -19,7 +19,7 @@ import org.redisson.misc.ReclosableLatch;
import io.netty.util.concurrent.Promise;
public class RedissonCountDownLatchEntry {
public class RedissonCountDownLatchEntry implements PubSubEntry<RedissonCountDownLatchEntry> {
private int counter;

@ -15,24 +15,22 @@
*/
package org.redisson;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RLock;
import org.redisson.pubsub.LockPubSub;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
@ -49,13 +47,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private static final ConcurrentMap<String, Timeout> refreshTaskMap = PlatformDependent.newConcurrentHashMap();
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
private final UUID id;
final UUID id;
private static final Long unlockMessage = 0L;
public static final Long unlockMessage = 0L;
private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();
private static final LockPubSub PUBSUB = new LockPubSub();
private final CommandExecutor commandExecutor;
final CommandExecutor commandExecutor;
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
@ -63,68 +61,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
this.id = id;
}
private void unsubscribe(RedissonLockEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
}
private String getEntryName() {
return id + ":" + getName();
}
private Future<RedissonLockEntry> subscribe() {
synchronized (ENTRIES) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
Promise<RedissonLockEntry> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
@Override
public void onMessage(String channel, Long message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName())
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}
};
commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise;
}
}
private String getChannelName() {
return "redisson__lock__channel__{" + getName() + "}";
String getChannelName() {
return "redisson_lock__channel__{" + getName() + "}";
}
@Override
@ -180,7 +122,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
// waiting for message
RedissonLockEntry entry = ENTRIES.get(getEntryName());
RedissonLockEntry entry = getEntry();
if (ttl >= 0) {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
@ -188,7 +130,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
}
} finally {
unsubscribe(future.getNow());
unsubscribe(future);
}
}
@ -229,7 +171,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
* Stop refresh timer
* @return true if timer was stopped successfully
*/
private void stopRefreshTask() {
void stopRefreshTask() {
Timeout task = refreshTaskMap.remove(getName());
if (task != null) {
task.cancel();
@ -237,7 +179,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
@ -248,7 +190,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " +
" o['c'] = o['c'] + 1; " +
" redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " +
" return nil; " +
" end;" +
" return redis.call('pttl', KEYS[1]); " +
@ -292,7 +235,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message
long current = System.currentTimeMillis();
RedissonLockEntry entry = ENTRIES.get(getEntryName());
RedissonLockEntry entry = getEntry();
if (ttl >= 0 && ttl < time) {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
@ -305,10 +248,22 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
return true;
} finally {
unsubscribe(future.getNow());
unsubscribe(future);
}
}
private RedissonLockEntry getEntry() {
return PUBSUB.getEntry(getEntryName());
}
private Future<RedissonLockEntry> subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
private void unsubscribe(Future<RedissonLockEntry> future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock(time, -1, unit);
@ -316,10 +271,10 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R2,
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" redis.call('publish', KEYS[2], ARGV[2]); " +
" return true; " +
"else " +
" local o = cjson.decode(v); " +
@ -330,13 +285,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
" return false;"+
" else " +
" redis.call('del', KEYS[1]);" +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" redis.call('publish', KEYS[2], ARGV[2]); " +
" return true;"+
" end" +
" end;" +
" return nil; " +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
@ -357,21 +312,21 @@ public class RedissonLock extends RedissonExpirable implements RLock {
get(forceUnlockAsync());
}
private Future<Boolean> forceUnlockAsync() {
Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', ARGV[2], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "else "
+ "return false "
+ "end",
Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), unlockMessage);
}
@Override
public boolean isLocked() {
return commandExecutor.read(getName(), RedisCommands.EXISTS, getName());
return isExists();
}
@Override

@ -19,19 +19,13 @@ import io.netty.util.concurrent.Promise;
import java.util.concurrent.Semaphore;
public class RedissonLockEntry {
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
private int counter;
private final Semaphore latch;
private final Promise<RedissonLockEntry> promise;
public RedissonLockEntry(RedissonLockEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}
public RedissonLockEntry(Promise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);
@ -54,26 +48,4 @@ public class RedissonLockEntry {
return latch;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedissonLockEntry other = (RedissonLockEntry) obj;
if (counter != other.counter)
return false;
return true;
}
}

@ -91,7 +91,7 @@ public interface RLock extends Lock, RExpirable {
void forceUnlock();
/**
* Checks if this lock locked by any thread in Redisson cluster
* Checks if this lock locked by any thread
*
* @return <code>true</code> if locked otherwise <code>false</code>
*/

@ -0,0 +1,54 @@
package org.redisson.pubsub;
import org.redisson.RedissonCountDownLatch;
import org.redisson.RedissonCountDownLatchEntry;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Promise;
public class CountDownLatchPubSub extends PublishSubscribe<RedissonCountDownLatchEntry> {
@Override
protected RedissonCountDownLatchEntry createEntry(Promise<RedissonCountDownLatchEntry> newPromise) {
return new RedissonCountDownLatchEntry(newPromise);
}
@Override
protected RedisPubSubListener<Long> createListener(final String channelName, final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
@Override
public void onMessage(String channel, Long message) {
if (!channelName.equals(channel)) {
return;
}
if (message.equals(RedissonCountDownLatch.zeroCountMessage)) {
value.getLatch().open();
}
if (message.equals(RedissonCountDownLatch.newCountMessage)) {
value.getLatch().close();
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (!channelName.equals(channel)) {
return false;
}
if (type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}
};
return listener;
}
}

@ -0,0 +1,51 @@
package org.redisson.pubsub;
import org.redisson.RedissonLock;
import org.redisson.RedissonLockEntry;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Promise;
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected RedissonLockEntry createEntry(Promise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected RedisPubSubListener<Long> createListener(final String channelName, final RedissonLockEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
@Override
public void onMessage(String channel, Long message) {
if (!channelName.equals(channel)) {
return;
}
if (message.equals(RedissonLock.unlockMessage)) {
value.getLatch().release();
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (!channelName.equals(channel)) {
return false;
}
if (type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}
};
return listener;
}
}

@ -0,0 +1,61 @@
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentMap;
import org.redisson.PubSubEntry;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) {
synchronized (this) {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName);
}
}
}
}
public E getEntry(String entryName) {
return entries.get(entryName);
}
public Future<E> subscribe(String entryName, String channelName, ConnectionManager connectionManager) {
synchronized (this) {
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
Promise<E> newPromise = connectionManager.newPromise();
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
RedisPubSubListener<Long> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener);
return newPromise;
}
}
protected abstract E createEntry(Promise<E> newPromise);
protected abstract RedisPubSubListener<Long> createListener(String channelName, E value);
}
Loading…
Cancel
Save