Command batches cluster redirect handling fixed

pull/509/head
Nikita 9 years ago
parent b60b170d45
commit 0d59eaea3c

@ -18,7 +18,7 @@ package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
class RedisRedirectException extends RedisException {
public class RedisRedirectException extends RedisException {
private static final long serialVersionUID = 181505625075250011L;

@ -111,7 +111,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decode(in, cmd, null, ctx.channel());
}
} catch (IOException e) {
cmd.getPromise().tryFailure(e);
cmd.tryFailure(e);
}
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;
@ -175,14 +175,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decode(in, cmd, null, ctx.channel());
i++;
} catch (IOException e) {
cmd.getPromise().tryFailure(e);
cmd.tryFailure(e);
}
if (!cmd.getPromise().isSuccess()) {
if (!(cmd.getPromise().cause() instanceof RedisMovedException
|| cmd.getPromise().cause() instanceof RedisAskException
|| cmd.getPromise().cause() instanceof RedisLoadingException)) {
error = (RedisException) cmd.getPromise().cause();
}
if (!cmd.isSuccess()) {
error = (RedisException) cmd.cause();
}
}
@ -197,7 +193,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
}
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
state(null);
@ -222,24 +218,24 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.getPromise().tryFailure(new RedisMovedException(slot, addr));
data.tryFailure(new RedisMovedException(slot, addr));
} else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.getPromise().tryFailure(new RedisAskException(slot, addr));
data.tryFailure(new RedisAskException(slot, addr));
} else if (error.startsWith("LOADING")) {
data.getPromise().tryFailure(new RedisLoadingException(error
data.tryFailure(new RedisLoadingException(error
+ ". channel: " + channel + " data: " + data));
} else if (error.startsWith("OOM")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1]
data.tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1]
+ ". channel: " + channel + " data: " + data));
} else if (error.contains("-OOM ")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1]
data.tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1]
+ ". channel: " + channel + " data: " + data));
} else {
if (data != null) {
data.getPromise().tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
} else {
log.error("Error: {} channel: {} data: {}", error, channel, data);
}
@ -346,7 +342,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts != null) {
parts.add(result);
} else {
if (!data.getPromise().trySuccess(result) && data.getPromise().cause() instanceof RedisTimeoutException) {
if (!data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
}
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;
import java.util.List;

@ -0,0 +1,62 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Promise;
public class BatchCommandData<T, R> extends CommandData<T, R> {
private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>();
public BatchCommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
super(promise, codec, command, params);
}
@Override
public boolean tryFailure(Throwable cause) {
if (redirectError.get() != null) {
return false;
}
if (cause instanceof RedisRedirectException) {
return redirectError.compareAndSet(null, (RedisRedirectException) cause);
}
return super.tryFailure(cause);
}
@Override
public boolean isSuccess() {
return redirectError.get() == null && super.isSuccess();
}
@Override
public Throwable cause() {
if (redirectError.get() != null) {
return redirectError.get();
}
return super.cause();
}
public void clearError() {
redirectError.set(null);
}
}

@ -59,6 +59,18 @@ public class CommandData<T, R> implements QueueCommand {
public Promise<R> getPromise() {
return promise;
}
public Throwable cause() {
return promise.cause();
}
public boolean isSuccess() {
return promise.isSuccess();
}
public boolean tryFailure(Throwable cause) {
return promise.tryFailure(cause);
}
public Codec getCodec() {
return codec;

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
public interface DecoderState {

@ -31,6 +31,7 @@ import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.BatchCommandData;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
@ -52,16 +53,16 @@ public class CommandBatchService extends CommandReactiveService {
public static class CommandEntry implements Comparable<CommandEntry> {
final CommandData<?, ?> command;
final BatchCommandData<?, ?> command;
final int index;
public CommandEntry(CommandData<?, ?> command, int index) {
public CommandEntry(BatchCommandData<?, ?> command, int index) {
super();
this.command = command;
this.index = index;
}
public CommandData<?, ?> getCommand() {
public BatchCommandData<?, ?> getCommand() {
return command;
}
@ -89,6 +90,12 @@ public class CommandBatchService extends CommandReactiveService {
public boolean isReadOnlyMode() {
return readOnlyMode;
}
public void clearErrors() {
for (CommandEntry commandEntry : commands) {
commandEntry.getCommand().clearError();
}
}
}
@ -96,7 +103,7 @@ public class CommandBatchService extends CommandReactiveService {
private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();
private boolean executed;
private volatile boolean executed;
public CommandBatchService(ConnectionManager connectionManager) {
super(connectionManager);
@ -106,7 +113,7 @@ public class CommandBatchService extends CommandReactiveService {
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
throw new IllegalStateException("Batch already has been executed!");
}
Entry entry = commands.get(nodeSource.getSlot());
if (entry == null) {
@ -120,7 +127,9 @@ public class CommandBatchService extends CommandReactiveService {
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, codec, command, params), index.incrementAndGet()));
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params);
entry.getCommands().add(new CommandEntry(commandData, index.incrementAndGet()));
}
public List<?> execute() {
@ -278,15 +287,18 @@ public class CommandBatchService extends CommandReactiveService {
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt);
return;
}

Loading…
Cancel
Save