Fixed - Redisson couldn't be shutdown if one of RBatch commands was canceled. #2275

pull/2300/head
Nikita Koksharov 6 years ago
parent c1c1e1e5bd
commit 53e4c223d6

@ -23,14 +23,12 @@ import org.redisson.api.BatchOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.BatchCommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandBatchService.ConnectionEntry;
import org.redisson.command.CommandBatchService.Entry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.AsyncSemaphore;
/**
*
@ -42,28 +40,24 @@ import org.redisson.pubsub.AsyncSemaphore;
public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
final ConcurrentMap<MasterSlaveEntry, Entry> commands;
final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections;
final BatchOptions options;
final AtomicInteger index;
final AtomicBoolean executed;
final AsyncSemaphore semaphore;
@SuppressWarnings("ParameterNumber")
public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
ConcurrentMap<MasterSlaveEntry, Entry> commands, ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections,
BatchOptions options, AtomicInteger index, AtomicBoolean executed, AsyncSemaphore semaphore) {
ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index, AtomicBoolean executed) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager,
objectBuilder);
this.commands = commands;
this.connections = connections;
this.options = options;
this.index = index;
this.executed = executed;
this.semaphore = semaphore;
}
protected final void addBatchCommandData(Object[] batchParams) {

@ -40,6 +40,11 @@ public class BatchPromise<T> extends RedissonPromise<T> {
return sentPromise;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public RPromise<T> sync() throws InterruptedException {
if (executed.get()) {

@ -140,7 +140,7 @@ public class CommandBatchService extends CommandAsyncService {
executor.execute();
} else {
RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore);
true, connectionManager, objectBuilder, commands, options, index, executed);
executor.execute();
}
@ -149,9 +149,10 @@ public class CommandBatchService extends CommandAsyncService {
@Override
public <R> RPromise<R> createPromise() {
if (isRedisBasedQueue()) {
return new BatchPromise<R>(executed);
return new BatchPromise<>(executed);
}
return super.createPromise();
return new RedissonPromise<>();
}
public BatchResult<?> execute() {
@ -258,6 +259,11 @@ public class CommandBatchService extends CommandAsyncService {
syncedSlaves = (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
if (commandEntry.getPromise().isCancelled()) {
continue;
}
Object entryResult = commandEntry.getPromise().getNow();
try {
entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult);
@ -321,12 +327,12 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<Map<MasterSlaveEntry, List<Object>>>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<MasterSlaveEntry, List<Object>>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<Map<MasterSlaveEntry, List<Object>>>(mainPromise, result);
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<>(mainPromise, result);
listener.setCounter(connections.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<List<Object>>();
RPromise<List<Object>> execPromise = new RedissonPromise<>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false);
execPromise.onComplete((r, ex) -> {

@ -22,14 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.BatchOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandBatchService.ConnectionEntry;
import org.redisson.command.CommandBatchService.Entry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.AsyncSemaphore;
/**
*
@ -44,10 +42,10 @@ public class RedisBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {
public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, AsyncSemaphore semaphore) {
BatchOptions options, AtomicInteger index,
AtomicBoolean executed) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, connections, options, index, executed, semaphore);
commands, options, index, executed);
}
@Override

@ -96,13 +96,22 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
}
for (CommandData<?, ?> c : entry.getCommands()) {
if (c.getPromise().isSuccess() && !isWaitCommand(c) && !isAtomic) {
// skip successful commands
if ((c.getPromise().isCancelled() || c.getPromise().isSuccess())
&& !isWaitCommand(c)
&& !isAtomic) {
// skip command
continue;
}
list.add(c);
}
if (list.isEmpty()) {
writeFuture = connection.getChannel().newPromise();
attemptPromise.trySuccess(null);
timeout.cancel();
return;
}
writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued));
}

@ -52,6 +52,9 @@ import org.redisson.pubsub.AsyncSemaphore;
*/
public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {
private final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections;
private final AsyncSemaphore semaphore;
@SuppressWarnings("ParameterNumber")
public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
@ -59,7 +62,10 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, AsyncSemaphore semaphore) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, connections, options, index, executed, semaphore);
commands, options, index, executed);
this.connections = connections;
this.semaphore = semaphore;
}
@Override

@ -381,6 +381,34 @@ public class RedissonBatchTest extends BaseTest {
Assert.assertEquals(539, res.getResponses().size());
}
@Test
public void testBatchCancel() {
RedissonClient redisson = createInstance();
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
RFuture<Void> f = batch.getBucket("test").setAsync(123);
assertThat(f.cancel(true)).isTrue();
}
BatchResult<?> res = batch.execute();
Assert.assertEquals(0, res.getResponses().size());
RBatch b2 = redisson.createBatch(batchOptions);
RListAsync<Integer> listAsync2 = b2.getList("list");
for (int i = 0; i < 6; i++) {
RFuture<Boolean> t = listAsync2.addAsync(i);
assertThat(t.cancel(true)).isTrue();
}
RFuture<BatchResult<?>> res2 = b2.executeAsync();
assertThat(res2.cancel(true)).isFalse();
Assert.assertEquals(0, res.getResponses().size());
redisson.shutdown();
}
@Test
public void testBatchBigRequest() {
Config config = createConfig();

Loading…
Cancel
Save