@ -213,40 +213,17 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message
final long current = System . currentTimeMillis ( ) ;
final RedissonLockEntry entry = getEntry ( ) ;
synchronized ( entry ) {
if ( entry . getLatch ( ) . tryAcquire ( ) ) {
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
} else {
final AtomicReference < Timeout > waitTimeoutFutureRef = new AtomicReference < Timeout > ( ) ;
final Timeout scheduledFuture ;
if ( nearestTimeout ! = null ) {
scheduledFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( Timeout timeout ) throws Exception {
if ( waitTimeoutFutureRef . get ( ) ! = null & & ! waitTimeoutFutureRef . get ( ) . cancel ( ) ) {
return ;
}
long elapsed = System . currentTimeMillis ( ) - current ;
time . addAndGet ( - elapsed ) ;
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} , nearestTimeout , TimeUnit . MILLISECONDS ) ;
} else {
scheduledFuture = null ;
}
if ( entry . getLatch ( ) . tryAcquire ( ) ) {
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
} else {
final AtomicReference < Timeout > waitTimeoutFutureRef = new AtomicReference < Timeout > ( ) ;
final Runnable listener = new Runnable ( ) {
final Timeout scheduledFuture ;
if ( nearestTimeout ! = null ) {
scheduledFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( ) {
public void run ( Timeout timeout ) throws Exception {
if ( waitTimeoutFutureRef . get ( ) ! = null & & ! waitTimeoutFutureRef . get ( ) . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
@ -255,29 +232,48 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} ;
entry . addListener ( listener ) ;
} , nearestTimeout , TimeUnit . MILLISECONDS ) ;
} else {
scheduledFuture = null ;
}
long t = time . get ( ) ;
Timeout waitTimeoutFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( Timeout timeout ) throws Exception {
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
return ;
}
final Runnable listener = new Runnable ( ) {
@Override
public void run ( ) {
if ( waitTimeoutFutureRef . get ( ) ! = null & & ! waitTimeoutFutureRef . get ( ) . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
long elapsed = System . currentTimeMillis ( ) - current ;
time . addAndGet ( - elapsed ) ;
synchronized ( entry ) {
if ( entry . removeListener ( listener ) ) {
long elapsed = System . currentTimeMillis ( ) - current ;
time . addAndGet ( - elapsed ) ;
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
}
}
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} ;
entry . addListener ( listener ) ;
long t = time . get ( ) ;
Timeout waitTimeoutFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( Timeout timeout ) throws Exception {
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
return ;
}
} , t , TimeUnit . MILLISECONDS ) ;
waitTimeoutFutureRef . set ( waitTimeoutFuture ) ;
}
if ( entry . removeListener ( listener ) ) {
long elapsed = System . currentTimeMillis ( ) - current ;
time . addAndGet ( - elapsed ) ;
tryAcquireAsync ( time , permits , subscribeFuture , result , ttl , timeUnit ) ;
}
}
} , t , TimeUnit . MILLISECONDS ) ;
waitTimeoutFutureRef . set ( waitTimeoutFuture ) ;
}
}
} ) ;
@ -318,34 +314,32 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
final RedissonLockEntry entry = getEntry ( ) ;
synchronized ( entry ) {
if ( entry . getLatch ( ) . tryAcquire ( permits ) ) {
acquireAsync ( permits , subscribeFuture , result , ttl , timeUnit ) ;
} else {
final Timeout scheduledFuture ;
if ( nearestTimeout ! = null ) {
scheduledFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( Timeout timeout ) throws Exception {
acquireAsync ( permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} , nearestTimeout , TimeUnit . MILLISECONDS ) ;
} else {
scheduledFuture = null ;
}
Runnable listener = new Runnable ( ) {
if ( entry . getLatch ( ) . tryAcquire ( permits ) ) {
acquireAsync ( permits , subscribeFuture , result , ttl , timeUnit ) ;
} else {
final Timeout scheduledFuture ;
if ( nearestTimeout ! = null ) {
scheduledFuture = commandExecutor . getConnectionManager ( ) . newTimeout ( new TimerTask ( ) {
@Override
public void run ( ) {
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
public void run ( Timeout timeout ) throws Exception {
acquireAsync ( permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} ;
entry . addListener ( listener ) ;
} , nearestTimeout , TimeUnit . MILLISECONDS ) ;
} else {
scheduledFuture = null ;
}
Runnable listener = new Runnable ( ) {
@Override
public void run ( ) {
if ( scheduledFuture ! = null & & ! scheduledFuture . cancel ( ) ) {
entry . getLatch ( ) . release ( ) ;
return ;
}
acquireAsync ( permits , subscribeFuture , result , ttl , timeUnit ) ;
}
} ;
entry . addListener ( listener ) ;
}
}
} ) ;