Batch acquire/release handling

pull/243/head
Nikita 10 years ago
parent 7c9e7c4239
commit a7bfc22e12

@ -126,15 +126,6 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (!connectionManager.getShutdownLatch().acquireAmount(commands.size())) {
IllegalStateException fail = new IllegalStateException("Redisson is shutdown");
for (Entry e : commands.values()) {
for (CommandEntry entry : e.getCommands()) {
entry.getCommand().getPromise().setFailure(fail);
}
}
return connectionManager.getGroup().next().newFailedFuture(fail);
}
if (commands.isEmpty()) {
return connectionManager.getGroup().next().newSucceededFuture(null);
@ -171,6 +162,11 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
}
final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
@ -203,7 +199,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
connection.send(new CommandsData(mainPromise, list));
connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

Loading…
Cancel
Save