|
|
|
@ -58,26 +58,14 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
this.id = id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void unsubscribe() {
|
|
|
|
|
while (true) {
|
|
|
|
|
RedissonLockEntry entry = ENTRIES.get(getEntryName());
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
|
|
|
|
|
newEntry.release();
|
|
|
|
|
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
|
|
|
|
|
if (newEntry.isFree()
|
|
|
|
|
&& ENTRIES.remove(getEntryName(), newEntry)) {
|
|
|
|
|
synchronized (ENTRIES) {
|
|
|
|
|
// maybe added during subscription
|
|
|
|
|
if (!ENTRIES.containsKey(getEntryName())) {
|
|
|
|
|
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
private void unsubscribe(RedissonLockEntry entry) {
|
|
|
|
|
synchronized (ENTRIES) {
|
|
|
|
|
if (entry.release() == 0) {
|
|
|
|
|
// just an assertion
|
|
|
|
|
boolean removed = ENTRIES.remove(getEntryName()) == entry;
|
|
|
|
|
if (removed) {
|
|
|
|
|
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -86,63 +74,49 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
return id + ":" + getName();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Promise<Boolean> aquire() {
|
|
|
|
|
while (true) {
|
|
|
|
|
private Future<RedissonLockEntry> subscribe() {
|
|
|
|
|
synchronized (ENTRIES) {
|
|
|
|
|
RedissonLockEntry entry = ENTRIES.get(getEntryName());
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
|
|
|
|
|
newEntry.aquire();
|
|
|
|
|
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
|
|
|
|
|
return newEntry.getPromise();
|
|
|
|
|
if (entry != null) {
|
|
|
|
|
entry.aquire();
|
|
|
|
|
return entry.getPromise();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Future<Boolean> subscribe() {
|
|
|
|
|
Promise<Boolean> promise = aquire();
|
|
|
|
|
if (promise != null) {
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
Promise<RedissonLockEntry> newPromise = newPromise();
|
|
|
|
|
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
|
|
|
|
|
value.aquire();
|
|
|
|
|
|
|
|
|
|
Promise<Boolean> newPromise = newPromise();
|
|
|
|
|
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
|
|
|
|
|
value.aquire();
|
|
|
|
|
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
|
|
|
|
|
if (oldValue != null) {
|
|
|
|
|
Promise<Boolean> oldPromise = aquire();
|
|
|
|
|
if (oldPromise == null) {
|
|
|
|
|
return subscribe();
|
|
|
|
|
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
|
|
|
|
|
if (oldValue != null) {
|
|
|
|
|
oldValue.aquire();
|
|
|
|
|
return oldValue.getPromise();
|
|
|
|
|
}
|
|
|
|
|
return oldPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
|
|
|
|
|
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, Integer message) {
|
|
|
|
|
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
|
|
|
|
|
value.getLatch().release();
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, Integer message) {
|
|
|
|
|
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
|
|
|
|
|
value.getLatch().release();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean onStatus(PubSubType type, String channel) {
|
|
|
|
|
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
|
|
|
|
|
value.getPromise().setSuccess(true);
|
|
|
|
|
return true;
|
|
|
|
|
@Override
|
|
|
|
|
public boolean onStatus(PubSubType type, String channel) {
|
|
|
|
|
if (channel.equals(getChannelName())
|
|
|
|
|
&& !value.getPromise().isSuccess()
|
|
|
|
|
&& type == PubSubType.SUBSCRIBE) {
|
|
|
|
|
value.getPromise().setSuccess(value);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
synchronized (ENTRIES) {
|
|
|
|
|
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
|
|
|
|
|
return newPromise;
|
|
|
|
|
}
|
|
|
|
|
return newPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getChannelName() {
|
|
|
|
@ -186,7 +160,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
subscribe().awaitUninterruptibly();
|
|
|
|
|
Future<RedissonLockEntry> future = subscribe();
|
|
|
|
|
future.awaitUninterruptibly();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
while (true) {
|
|
|
|
@ -209,7 +184,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
unsubscribe();
|
|
|
|
|
unsubscribe(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -290,7 +265,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
|
|
|
|
|
Future<RedissonLockEntry> future = subscribe();
|
|
|
|
|
if (!future.awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -325,7 +301,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
} finally {
|
|
|
|
|
unsubscribe();
|
|
|
|
|
unsubscribe(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|