Fixed - error handling during RBatch execution in executionMode = REDIS_WRITE_ATOMIC/REDIS_READ_ATOMIC

pull/1499/head
Nikita 7 years ago
parent d7e2b718ca
commit 1133f4a1dc

@ -229,6 +229,7 @@ public class CommandBatchService extends CommandAsyncService {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleError(sentPromise, cause);
super.handleError(promise, cause);
semaphore.release();
return;
}
@ -462,39 +463,42 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : future.getNow().entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : future.getNow().entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
promise.trySuccess(resultIter.next());
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
promise.trySuccess(resultIter.next());
}
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
}
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R)result);
} catch (Exception e) {
resultPromise.tryFailure(e);
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R)result);
commands = null;
}
});

@ -3,6 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -28,6 +29,7 @@ import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode;
@ -135,15 +137,20 @@ public class RedissonBatchTest extends BaseTest {
process.shutdown();
}
@Test
public void testWriteTimeout() {
RBatch batch = redisson.createBatch(batchOptions);
RMapCacheAsync<String, String> map = batch.getMapCache("test");
for (int i = 0; i < 200000; i++) {
RMapCacheAsync<String, String> map = batch.getMapCache("test");
map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS);
RFuture<String> f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES);
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.syncUninterruptibly();
}
}
batch.execute();
assertThat(redisson.getMapCache("test").size()).isEqualTo(200000);
}
@Test

Loading…
Cancel
Save