|
|
|
@ -277,18 +277,24 @@ public abstract class LocalCacheListener {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
byte[] id = commandExecutor.getServiceManager().generateIdArray();
|
|
|
|
|
RFuture<Long> future = publishAsync(id);
|
|
|
|
|
CompletionStage<Void> f = future.thenCompose(res -> {
|
|
|
|
|
if (res.intValue() == 0) {
|
|
|
|
|
return CompletableFuture.completedFuture(null);
|
|
|
|
|
}
|
|
|
|
|
RSemaphore semaphore = getClearSemaphore(id);
|
|
|
|
|
CompletionStage<Void> f = semaphore.trySetPermitsAsync(0)
|
|
|
|
|
.thenCompose(r -> semaphore.expireAsync(Duration.ofSeconds(60)))
|
|
|
|
|
.thenCompose(r -> publishAsync(id))
|
|
|
|
|
.thenCompose(res -> {
|
|
|
|
|
if (res == 0) {
|
|
|
|
|
return semaphore.deleteAsync()
|
|
|
|
|
.thenApply(r -> null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RSemaphore semaphore = getClearSemaphore(id);
|
|
|
|
|
return semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS)
|
|
|
|
|
.thenCompose(r -> {
|
|
|
|
|
return semaphore.deleteAsync().thenApply(re -> null);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
System.out.println("res " + res);
|
|
|
|
|
return semaphore.tryAcquireAsync(res.intValue() - 1, 40, TimeUnit.SECONDS)
|
|
|
|
|
.thenCompose(r -> {
|
|
|
|
|
System.out.println("aca " + r);
|
|
|
|
|
return semaphore.deleteAsync()
|
|
|
|
|
.thenApply(re -> null);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
return new CompletableFutureWrapper<>(f);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -382,9 +388,7 @@ public abstract class LocalCacheListener {
|
|
|
|
|
|
|
|
|
|
private RSemaphore getClearSemaphore(byte[] requestId) {
|
|
|
|
|
String id = ByteBufUtil.hexDump(requestId);
|
|
|
|
|
RSemaphore semaphore = new RedissonSemaphore(commandExecutor, name + ":clear:" + id);
|
|
|
|
|
semaphore.expireAsync(Duration.ofSeconds(60));
|
|
|
|
|
return semaphore;
|
|
|
|
|
return new RedissonSemaphore(commandExecutor, name + ":clear:" + id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <K, V> int addListener(LocalCacheInvalidateListener<K, V> listener) {
|
|
|
|
|