refactoring

pull/4536/merge
Nikita Koksharov 2 years ago
parent 1a324c3057
commit 44ca112260

@ -74,73 +74,83 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
private final Queue<CompletableFuture<Long>> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isWorkerActive = new AtomicBoolean();
private void send() {
private void startIdRequestsHandle() {
if (!isWorkerActive.compareAndSet(false, true)
|| commandExecutor.getConnectionManager().getExecutor().isShutdown()) {
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
while (true) {
if (queue.peek() == null) {
isWorkerActive.set(false);
if (!queue.isEmpty()) {
send();
handleIdRequests();
}
private void handleIdRequests() {
if (queue.peek() == null) {
isWorkerActive.set(false);
if (!queue.isEmpty()) {
startIdRequestsHandle();
}
return;
}
long v = counter.decrementAndGet();
if (v >= 0) {
CompletableFuture<Long> pp = queue.poll();
if (pp != null) {
pp.complete(start.incrementAndGet());
handleIdRequests();
} else {
counter.incrementAndGet();
isWorkerActive.set(false);
}
} else {
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local allocationSize = redis.call('get', KEYS[2]); " +
"if allocationSize == false then " +
"allocationSize = 5000; " +
"redis.call('set', KEYS[2], allocationSize);" +
"end;" +
"local value = redis.call('get', KEYS[1]); " +
"if value == false then " +
"redis.call('incr', KEYS[1]);" +
"value = 1; " +
"end; " +
"redis.call('incrby', KEYS[1], allocationSize); " +
"return {value, allocationSize}; ",
Arrays.asList(getRawName(), getAllocationSizeName()));
future.whenComplete((res, ex) -> {
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
return;
}
break;
log.error(ex.getMessage(), ex);
commandExecutor.getConnectionManager().newTimeout(task -> {
handleIdRequests();
}, 1, TimeUnit.SECONDS);
return;
}
long v = counter.decrementAndGet();
if (v >= 0) {
CompletableFuture<Long> pp = queue.poll();
pp.complete(start.incrementAndGet());
} else {
try {
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local allocationSize = redis.call('get', KEYS[2]); " +
"if allocationSize == false then " +
"allocationSize = 5000; " +
"redis.call('set', KEYS[2], allocationSize);" +
"end;" +
"local value = redis.call('get', KEYS[1]); " +
"if value == false then " +
"redis.call('incr', KEYS[1]);" +
"value = 1; " +
"end; " +
"redis.call('incrby', KEYS[1], allocationSize); " +
"return {value, allocationSize}; ",
Arrays.asList(getRawName(), getAllocationSizeName()));
List<Object> res = get(future);
long value = (long) res.get(0);
long allocationSize = (long) res.get(1);
start.set(value);
counter.set(allocationSize);
CompletableFuture<Long> pp = queue.poll();
counter.decrementAndGet();
pp.complete(start.get());
} catch (Exception e) {
if (e instanceof RedissonShutdownException) {
break;
}
log.error(e.getMessage(), e);
isWorkerActive.set(false);
send();
break;
}
long value = (long) res.get(0);
long allocationSize = (long) res.get(1);
start.set(value);
counter.set(allocationSize);
CompletableFuture<Long> pp = queue.poll();
if (pp != null) {
counter.decrementAndGet();
pp.complete(start.get());
}
}
});
handleIdRequests();
});
}
}
@Override
public RFuture<Long> nextIdAsync() {
CompletableFuture<Long> promise = new CompletableFuture<>();
queue.add(promise);
send();
startIdRequestsHandle();
return new CompletableFutureWrapper<>(promise);
}

Loading…
Cancel
Save