Merge pull request #5919 from seakider/fix_append_command

Fixed - append commands error when using batch mode
pull/5924/head
Nikita Koksharov 8 months ago committed by GitHub
commit 277addd78b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -426,38 +426,31 @@ public class CommandBatchService extends CommandAsyncService {
}
for (Map.Entry<NodeSource, Entry> e : r.entrySet()) {
Entry entry = e.getValue();
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData<>(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.addFirstCommand(multiCommand);
BatchCommandData<?, ?> execCommand = new BatchCommandData<>(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet());
entry.add(execCommand);
}
BatchCommandData<?, ?> multiCommand = new BatchCommandData<>(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.addFirstCommand(multiCommand);
BatchCommandData<?, ?> execCommand = new BatchCommandData<>(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet());
entry.add(execCommand);
}
if (this.options.isSkipResult()) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData<>(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.addFirstCommand(offCommand);
BatchCommandData<?, ?> onCommand = new BatchCommandData<>(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.add(onCommand);
}
BatchCommandData<?, ?> offCommand = new BatchCommandData<>(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.addFirstCommand(offCommand);
BatchCommandData<?, ?> onCommand = new BatchCommandData<>(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.add(onCommand);
}
if (this.options.getSyncSlaves() > 0) {
BatchCommandData<?, ?> waitCommand;
if (this.options.isSyncAOF()) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData<>(RedisCommands.WAITAOF,
new Object[] { this.options.getSyncLocals(), this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
entry.add(waitCommand);
}
waitCommand = new BatchCommandData<>(RedisCommands.WAITAOF,
new Object[]{this.options.getSyncLocals(), this.options.getSyncSlaves(), this.options.getSyncTimeout()}, index.incrementAndGet());
} else {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData<>(RedisCommands.WAIT,
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
entry.add(waitCommand);
}
waitCommand = new BatchCommandData<>(RedisCommands.WAIT,
new Object[]{this.options.getSyncSlaves(), this.options.getSyncTimeout()}, index.incrementAndGet());
}
entry.add(waitCommand);
}
BatchOptions options = BatchOptions.defaults()

@ -38,6 +38,45 @@ public class RedissonBatchTest extends RedisDockerTest {
});
}
@ParameterizedTest
@MethodSource("data")
public void testMemoryAtomicInCluster(BatchOptions batchOptions) {
Assumptions.assumeTrue(batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY);
testInCluster(client -> {
Config config = client.getConfig();
config.useClusterServers()
.setTimeout(123000);
RedissonClient redisson = Redisson.create(config);
batchOptions
.sync(1, Duration.ofSeconds(1))
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("{a}Test1").setAsync("test1");
batch.getBucket("{a}Test2").setAsync("test2");
batch.getBucket("{b}Test3").setAsync("test3");
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(2);
assertThat(result.getResponses().size()).isEqualTo(3);
batchOptions
.skipResult()
.sync(0, Duration.ofSeconds(1));
batch = redisson.createBatch(batchOptions);
batch.getBucket("{a}Test1").getAsync();
batch.getBucket("{a}Test2").getAsync();
batch.getBucket("{b}Test3").getAsync();
result = batch.execute();
assertThat(result.getResponses().size()).isEqualTo(0);
redisson.shutdown();
});
}
@ParameterizedTest
@MethodSource("data")
public void testSlotMigrationInCluster(BatchOptions batchOptions) {

Loading…
Cancel
Save