Fixed - Node hasn't been discovered yet error isn't resolved by a new attempt for RBatch and RLock objects

pull/4913/head
Nikita Koksharov
parent 5c94db4189
commit 2d2b4d4980

@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
final ConcurrentMap<MasterSlaveEntry, Entry> commands;
final ConcurrentMap<NodeSource, Entry> commands;
final BatchOptions options;
final AtomicInteger index;
@ -49,7 +49,7 @@ public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, CompletableFuture<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
ConcurrentMap<MasterSlaveEntry, Entry> commands,
ConcurrentMap<NodeSource, Entry> commands,
BatchOptions options, AtomicInteger index, AtomicBoolean executed, RedissonObjectBuilder.ReferenceType referenceType,
boolean noRetry) {
@ -87,8 +87,7 @@ public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
}
protected final void addBatchCommandData(Object[] batchParams) {
MasterSlaveEntry msEntry = getEntry();
Entry entry = commands.computeIfAbsent(msEntry, k -> new Entry());
Entry entry = commands.computeIfAbsent(source, k -> new Entry());
if (!readOnlyMode) {
entry.setReadOnlyMode(false);

@ -108,7 +108,8 @@ public class CommandBatchService extends CommandAsyncService {
private final AtomicInteger index = new AtomicInteger();
private final ConcurrentMap<MasterSlaveEntry, Entry> commands = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeSource, Entry> commands = new ConcurrentHashMap<>();
private Map<MasterSlaveEntry, Entry> aggregatedCommands;
private final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = new ConcurrentHashMap<>();
private final BatchOptions options;
@ -154,7 +155,8 @@ public class CommandBatchService extends CommandAsyncService {
if (isRedisBasedQueue()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,
false, connectionManager, objectBuilder, commands, connections, options, index, executed, referenceType, noRetry);
false, connectionManager, objectBuilder, commands, connections, options, index, executed,
referenceType, noRetry, aggregatedCommands);
executor.execute();
} else {
RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
@ -224,32 +226,6 @@ public class CommandBatchService extends CommandAsyncService {
return executeRedisBasedQueue();
}
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.getCommands().addFirst(multiCommand);
BatchCommandData<?, ?> execCommand = new BatchCommandData(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet());
entry.getCommands().add(execCommand);
}
}
if (this.options.isSkipResult()) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(onCommand);
}
}
if (this.options.getSyncSlaves() > 0) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
}
CompletableFuture<BatchResult<?>> promise = new CompletableFuture<>();
CompletableFuture<Void> voidPromise = new CompletableFuture<>();
if (this.options.isSkipResult()
@ -258,17 +234,19 @@ public class CommandBatchService extends CommandAsyncService {
executed.set(true);
if (ex != null) {
for (Entry e : commands.values()) {
for (Entry e : aggregatedCommands.values()) {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.completeExceptionally(ex);
aggregatedCommands.clear();
commands.clear();
nestedServices.clear();
return;
}
aggregatedCommands.clear();
commands.clear();
nestedServices.clear();
promise.complete(new BatchResult<>(Collections.emptyList(), 0));
@ -277,19 +255,20 @@ public class CommandBatchService extends CommandAsyncService {
voidPromise.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
for (Entry e : commands.values()) {
for (Entry e : aggregatedCommands.values()) {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.completeExceptionally(ex);
aggregatedCommands.clear();
commands.clear();
nestedServices.clear();
return;
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
for (Entry e : aggregatedCommands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
@ -321,12 +300,21 @@ public class CommandBatchService extends CommandAsyncService {
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
promise.complete(result);
aggregatedCommands.clear();
commands.clear();
nestedServices.clear();
});
}
AtomicInteger slots = new AtomicInteger(commands.size());
return execute(promise, voidPromise);
}
private CompletableFutureWrapper<BatchResult<?>> execute(CompletableFuture<BatchResult<?>> promise, CompletableFuture<Void> voidPromise) {
AtomicInteger attempt = new AtomicInteger();
CompletableFuture<Map<MasterSlaveEntry, Entry>> future = new CompletableFuture<>();
resolveCommands(attempt, future);
future.thenAccept(r -> {
AtomicInteger slots = new AtomicInteger(r.size());
for (Map.Entry<RFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {
slots.incrementAndGet();
@ -339,14 +327,90 @@ public class CommandBatchService extends CommandAsyncService {
});
}
for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
for (Map.Entry<MasterSlaveEntry, Entry> e : r.entrySet()) {
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.getCommands().addFirst(multiCommand);
BatchCommandData<?, ?> execCommand = new BatchCommandData(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet());
entry.getCommands().add(execCommand);
}
}
if (this.options.isSkipResult()) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(onCommand);
}
}
if (this.options.getSyncSlaves() > 0) {
for (Entry entry : r.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
}
BatchOptions options = BatchOptions.defaults()
.executionMode(this.options.getExecutionMode())
.syncSlaves(this.options.getSyncSlaves(), this.options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(this.options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(Math.max(0, this.options.getRetryAttempts() - attempt.get()))
.retryInterval(this.options.getRetryInterval(), TimeUnit.MILLISECONDS);
if (this.options.isSkipResult()) {
options.skipResult();
}
RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,
connectionManager, this.options, e.getValue(), slots, referenceType, false);
connectionManager, options, e.getValue(), slots, referenceType, false);
executor.execute();
}
});
return new CompletableFutureWrapper<>(promise);
}
private void resolveCommands(AtomicInteger attempt, CompletableFuture<Map<MasterSlaveEntry, Entry>> future) {
Map<MasterSlaveEntry, Entry> result = new HashMap<>();
for (Map.Entry<NodeSource, Entry> e : commands.entrySet()) {
MasterSlaveEntry entry = getEntry(e.getKey());
if (entry == null) {
if (attempt.incrementAndGet() == this.options.getRetryAttempts() + 1) {
future.completeExceptionally(connectionManager.getServiceManager().createNodeNotFoundException(e.getKey()));
return;
}
connectionManager.getServiceManager().newTimeout(task -> {
resolveCommands(attempt, future);
}, this.options.getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
Entry ee = result.computeIfAbsent(entry, k -> new Entry());
if (!e.getValue().isReadOnlyMode()) {
ee.setReadOnlyMode(false);
}
ee.getCommands().addAll(e.getValue().getCommands());
}
for (Entry entry : result.values()) {
List<BatchCommandData> list = new ArrayList<>(entry.getCommands());
Collections.sort(list);
entry.getCommands().clear();
entry.getCommands().addAll((Collection<? extends BatchCommandData<?, ?>>) (Object) list);
}
aggregatedCommands = result;
future.complete(result);
}
private MasterSlaveEntry getEntry(NodeSource source) {
if (source.getSlot() != null) {
return connectionManager.getWriteEntry(source.getSlot());
}
return source.getEntry();
}
protected Throwable cause(CompletableFuture<?> future) {
try {
future.getNow(null);
@ -406,10 +470,19 @@ public class CommandBatchService extends CommandAsyncService {
}
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>(commands.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RFuture<List<Object>> execPromise = async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()),
codec, RedisCommands.EXEC, new Object[] {}, false, false);
AtomicInteger attempt = new AtomicInteger();
CompletableFuture<Map<MasterSlaveEntry, Entry>> resolvedEntriesFuture = new CompletableFuture<>();
resolveCommands(attempt, resolvedEntriesFuture);
resolvedEntriesFuture.thenAccept(map -> {
List<CompletableFuture<Void>> futures = new ArrayList<>(map.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : aggregatedCommands.entrySet()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
CompletableFuture<List<Object>> execPromise = createPromise();
RedisExecutor<List<Object>, List<Object>> executor = new RedisQueuedBatchExecutor<>(isReadOnly, new NodeSource(entry.getKey()), codec,
RedisCommands.EXEC, new Object[] {}, execPromise,
false, connectionManager, objectBuilder, commands, connections,
options, index, executed, referenceType, false, aggregatedCommands);
executor.execute();
CompletionStage<Void> f = execPromise.thenCompose(r -> {
BatchCommandData<?, Integer> lastCommand = (BatchCommandData<?, Integer>) entry.getValue().getCommands().peekLast();
@ -433,7 +506,7 @@ public class CommandBatchService extends CommandAsyncService {
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : result.entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Entry commandEntry = aggregatedCommands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
@ -451,7 +524,7 @@ public class CommandBatchService extends CommandAsyncService {
}
List<BatchCommandData> entries = new ArrayList<>();
for (Entry e : commands.values()) {
for (Entry e : aggregatedCommands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
@ -476,6 +549,7 @@ public class CommandBatchService extends CommandAsyncService {
}
});
});
});
return new CompletableFutureWrapper<>(resultPromise);
}

@ -20,7 +20,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandBatchService.Entry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
@ -41,7 +40,7 @@ public class RedisBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {
@SuppressWarnings("ParameterNumber")
public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, CompletableFuture<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
RedissonObjectBuilder objectBuilder, ConcurrentMap<NodeSource, Entry> commands,
BatchOptions options, AtomicInteger index,
AtomicBoolean executed, RedissonObjectBuilder.ReferenceType referenceType, boolean noRetry) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,

@ -31,6 +31,7 @@ import org.redisson.misc.LogHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
@ -47,23 +48,37 @@ import java.util.concurrent.atomic.AtomicInteger;
public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {
private final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections;
private final Map<MasterSlaveEntry, Entry> aggregatedCommands;
@SuppressWarnings("ParameterNumber")
public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, CompletableFuture<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
RedissonObjectBuilder objectBuilder, ConcurrentMap<NodeSource, Entry> commands,
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, RedissonObjectBuilder.ReferenceType referenceType,
boolean noRetry) {
boolean noRetry, Map<MasterSlaveEntry, Entry> aggregatedCommands) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, options, index, executed, referenceType, noRetry);
this.aggregatedCommands = aggregatedCommands;
this.connections = connections;
}
@Override
public void execute() {
if (source.getEntry() != null) {
Entry entry = aggregatedCommands.computeIfAbsent(source.getEntry(), k -> new Entry());
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
Codec codecToUse = getCodec(codec);
BatchCommandData<V, R> commandData = new BatchCommandData<>(mainPromise, codecToUse, command, null, index.incrementAndGet());
entry.getCommands().add(commandData);
} else {
addBatchCommandData(null);
}
if (!readOnlyMode && this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC) {
throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC");
@ -151,7 +166,7 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
connectionEntry.setFirstCommand(false);
} else {
if (RedisCommands.EXEC.getName().equals(command.getName())) {
Entry entry = commands.get(msEntry);
Entry entry = aggregatedCommands.get(msEntry);
List<CommandData<?, ?>> list = new ArrayList<>();

Loading…
Cancel
Save