refactoring

pull/2450/head
Nikita Koksharov 5 years ago
parent d9d346fa33
commit 26cda95ef0

@ -17,9 +17,14 @@ package org.redisson;
import org.redisson.misc.RPromise;
/**
*
* Nikita Koksharov
*
*/
public interface PubSubEntry<E> {
void aquire();
void acquire();
int release();

@ -47,16 +47,17 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
public void await() throws InterruptedException {
if (getCount() == 0) {
return;
}
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
commandExecutor.syncSubscription(future);
while (getCount() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await();
}
future.getNow().getLatch().await();
}
} finally {
unsubscribe(future);
@ -67,6 +68,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public boolean await(long time, TimeUnit unit) throws InterruptedException {
long remainTime = unit.toMillis(time);
long current = System.currentTimeMillis();
if (getCount() == 0) {
return true;
}
RFuture<RedissonCountDownLatchEntry> promise = subscribe();
if (!promise.await(time, unit)) {
return false;
@ -84,10 +88,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
current = System.currentTimeMillis();
// waiting for open state
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await(remainTime, TimeUnit.MILLISECONDS);
}
promise.getNow().getLatch().await(remainTime, TimeUnit.MILLISECONDS);
remainTime -= System.currentTimeMillis() - current;
}
@ -98,10 +99,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
private RedissonCountDownLatchEntry getEntry() {
return pubSub.getEntry(getEntryName());
}
private RFuture<RedissonCountDownLatchEntry> subscribe() {
return pubSub.subscribe(getEntryName(), getChannelName());
}

@ -31,7 +31,7 @@ public class RedissonCountDownLatchEntry implements PubSubEntry<RedissonCountDow
this.promise = promise;
}
public void aquire() {
public void acquire() {
counter++;
}

@ -57,11 +57,6 @@ public class RedissonFairLock extends RedissonLock implements RLock {
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
@Override
protected RedissonLockEntry getEntry(long threadId) {
return pubSub.getEntry(getEntryName() + ":" + threadId);
}
@Override
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName() + ":" + threadId,

@ -192,18 +192,18 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
future.getNow().getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
@ -416,9 +416,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
@ -433,10 +433,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
protected RedissonLockEntry getEntry(long threadId) {
return pubSub.getEntry(getEntryName());
}
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
@ -657,7 +653,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return;
}
RedissonLockEntry entry = getEntry(currentThreadId);
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire()) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {
@ -826,7 +822,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// waiting for message
long current = System.currentTimeMillis();
RedissonLockEntry entry = getEntry(currentThreadId);
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire()) {
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {

@ -34,7 +34,7 @@ public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
this.promise = promise;
}
public void aquire() {
public void acquire() {
counter++;
}

@ -107,9 +107,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
if (nearestTimeout != null) {
getEntry().getLatch().tryAcquire(permits, nearestTimeout, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(permits, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
getEntry().getLatch().acquire(permits);
future.getNow().getLatch().acquire(permits);
}
}
} finally {
@ -200,7 +200,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message
long current = System.currentTimeMillis();
RedissonLockEntry entry = getEntry();
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, subscribeFuture, result, ttl, timeUnit);
} else {
@ -294,7 +294,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
nearestTimeout = null;
}
RedissonLockEntry entry = getEntry();
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire(permits)) {
acquireAsync(permits, subscribeFuture, result, ttl, timeUnit);
} else {
@ -461,9 +461,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
current = System.currentTimeMillis();
if (nearestTimeout != null) {
getEntry().getLatch().tryAcquire(permits, Math.min(time, nearestTimeout), TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(permits, Math.min(time, nearestTimeout), TimeUnit.MILLISECONDS);
} else {
getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
}
long elapsed = System.currentTimeMillis() - current;
@ -540,11 +540,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return result;
}
private RedissonLockEntry getEntry() {
return semaphorePubSub.getEntry(getName());
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName(), getChannelName());
}

@ -85,7 +85,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return;
}
getEntry().getLatch().acquire(permits);
future.getNow().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
@ -168,7 +168,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message
long current = System.currentTimeMillis();
RedissonLockEntry entry = getEntry();
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, subscribeFuture, result);
} else {
@ -228,7 +228,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return;
}
RedissonLockEntry entry = getEntry();
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire(permits)) {
acquireAsync(permits, subscribeFuture, result);
} else {
@ -318,7 +318,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message
current = System.currentTimeMillis();
getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
future.getNow().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
time -= System.currentTimeMillis() - current;
if (time <= 0) {
@ -398,11 +398,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return result;
}
private RedissonLockEntry getEntry() {
return semaphorePubSub.getEntry(getName());
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName(), getChannelName());
}

@ -66,10 +66,6 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
public E getEntry(String entryName) {
return entries.get(entryName);
}
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
@ -86,18 +82,18 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
public void run() {
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.aquire();
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;

Loading…
Cancel
Save