diff --git a/src/main/java/org/redisson/client/RedisRedirectException.java b/src/main/java/org/redisson/client/RedisRedirectException.java index e2c30b761..9c15b87e5 100644 --- a/src/main/java/org/redisson/client/RedisRedirectException.java +++ b/src/main/java/org/redisson/client/RedisRedirectException.java @@ -18,7 +18,7 @@ package org.redisson.client; import java.net.InetSocketAddress; import java.net.URI; -class RedisRedirectException extends RedisException { +public class RedisRedirectException extends RedisException { private static final long serialVersionUID = 181505625075250011L; diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index d9eb1c1d2..260c21fb7 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -111,7 +111,7 @@ public class CommandDecoder extends ReplayingDecoder { decode(in, cmd, null, ctx.channel()); } } catch (IOException e) { - cmd.getPromise().tryFailure(e); + cmd.tryFailure(e); } } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; @@ -175,14 +175,10 @@ public class CommandDecoder extends ReplayingDecoder { decode(in, cmd, null, ctx.channel()); i++; } catch (IOException e) { - cmd.getPromise().tryFailure(e); + cmd.tryFailure(e); } - if (!cmd.getPromise().isSuccess()) { - if (!(cmd.getPromise().cause() instanceof RedisMovedException - || cmd.getPromise().cause() instanceof RedisAskException - || cmd.getPromise().cause() instanceof RedisLoadingException)) { - error = (RedisException) cmd.getPromise().cause(); - } + if (!cmd.isSuccess()) { + error = (RedisException) cmd.cause(); } } @@ -197,7 +193,7 @@ public class CommandDecoder extends ReplayingDecoder { log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); } } - + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); state(null); @@ -222,24 +218,24 @@ public class CommandDecoder extends ReplayingDecoder { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); String addr = errorParts[2]; - data.getPromise().tryFailure(new RedisMovedException(slot, addr)); + data.tryFailure(new RedisMovedException(slot, addr)); } else if (error.startsWith("ASK")) { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); String addr = errorParts[2]; - data.getPromise().tryFailure(new RedisAskException(slot, addr)); + data.tryFailure(new RedisAskException(slot, addr)); } else if (error.startsWith("LOADING")) { - data.getPromise().tryFailure(new RedisLoadingException(error + data.tryFailure(new RedisLoadingException(error + ". channel: " + channel + " data: " + data)); } else if (error.startsWith("OOM")) { - data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1] + data.tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1] + ". channel: " + channel + " data: " + data)); } else if (error.contains("-OOM ")) { - data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1] + data.tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1] + ". channel: " + channel + " data: " + data)); } else { if (data != null) { - data.getPromise().tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); + data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); } else { log.error("Error: {} channel: {} data: {}", error, channel, data); } @@ -346,7 +342,7 @@ public class CommandDecoder extends ReplayingDecoder { if (parts != null) { parts.add(result); } else { - if (!data.getPromise().trySuccess(result) && data.getPromise().cause() instanceof RedisTimeoutException) { + if (!data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result); } } diff --git a/src/main/java/org/redisson/client/handler/StateLevel.java b/src/main/java/org/redisson/client/handler/StateLevel.java index 37f2fbc4a..64b0d9186 100644 --- a/src/main/java/org/redisson/client/handler/StateLevel.java +++ b/src/main/java/org/redisson/client/handler/StateLevel.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.client.handler; import java.util.List; diff --git a/src/main/java/org/redisson/client/protocol/BatchCommandData.java b/src/main/java/org/redisson/client/protocol/BatchCommandData.java new file mode 100644 index 000000000..70ce32f84 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/BatchCommandData.java @@ -0,0 +1,62 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.client.RedisRedirectException; +import org.redisson.client.codec.Codec; + +import io.netty.util.concurrent.Promise; + +public class BatchCommandData extends CommandData { + + private final AtomicReference redirectError = new AtomicReference(); + + public BatchCommandData(Promise promise, Codec codec, RedisCommand command, Object[] params) { + super(promise, codec, command, params); + } + + @Override + public boolean tryFailure(Throwable cause) { + if (redirectError.get() != null) { + return false; + } + if (cause instanceof RedisRedirectException) { + return redirectError.compareAndSet(null, (RedisRedirectException) cause); + } + + return super.tryFailure(cause); + } + + @Override + public boolean isSuccess() { + return redirectError.get() == null && super.isSuccess(); + } + + @Override + public Throwable cause() { + if (redirectError.get() != null) { + return redirectError.get(); + } + return super.cause(); + } + + public void clearError() { + redirectError.set(null); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/CommandData.java b/src/main/java/org/redisson/client/protocol/CommandData.java index a47227c9e..7cafdedab 100644 --- a/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/src/main/java/org/redisson/client/protocol/CommandData.java @@ -59,6 +59,18 @@ public class CommandData implements QueueCommand { public Promise getPromise() { return promise; } + + public Throwable cause() { + return promise.cause(); + } + + public boolean isSuccess() { + return promise.isSuccess(); + } + + public boolean tryFailure(Throwable cause) { + return promise.tryFailure(cause); + } public Codec getCodec() { return codec; diff --git a/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java b/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java index c3abf7502..1c9c78afa 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java +++ b/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.client.protocol.decoder; public interface DecoderState { diff --git a/src/main/java/org/redisson/command/CommandBatchService.java b/src/main/java/org/redisson/command/CommandBatchService.java index 6bf4083cb..3939f1190 100644 --- a/src/main/java/org/redisson/command/CommandBatchService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -31,6 +31,7 @@ import org.redisson.client.RedisTimeoutException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.BatchCommandData; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; @@ -52,16 +53,16 @@ public class CommandBatchService extends CommandReactiveService { public static class CommandEntry implements Comparable { - final CommandData command; + final BatchCommandData command; final int index; - public CommandEntry(CommandData command, int index) { + public CommandEntry(BatchCommandData command, int index) { super(); this.command = command; this.index = index; } - public CommandData getCommand() { + public BatchCommandData getCommand() { return command; } @@ -89,6 +90,12 @@ public class CommandBatchService extends CommandReactiveService { public boolean isReadOnlyMode() { return readOnlyMode; } + + public void clearErrors() { + for (CommandEntry commandEntry : commands) { + commandEntry.getCommand().clearError(); + } + } } @@ -96,7 +103,7 @@ public class CommandBatchService extends CommandReactiveService { private ConcurrentMap commands = PlatformDependent.newConcurrentHashMap(); - private boolean executed; + private volatile boolean executed; public CommandBatchService(ConnectionManager connectionManager) { super(connectionManager); @@ -106,7 +113,7 @@ public class CommandBatchService extends CommandReactiveService { protected void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand command, Object[] params, Promise mainPromise, int attempt) { if (executed) { - throw new IllegalStateException("Batch already executed!"); + throw new IllegalStateException("Batch already has been executed!"); } Entry entry = commands.get(nodeSource.getSlot()); if (entry == null) { @@ -120,7 +127,9 @@ public class CommandBatchService extends CommandReactiveService { if (!readOnlyMode) { entry.setReadOnlyMode(false); } - entry.getCommands().add(new CommandEntry(new CommandData(mainPromise, codec, command, params), index.incrementAndGet())); + + BatchCommandData commandData = new BatchCommandData(mainPromise, codec, command, params); + entry.getCommands().add(new CommandEntry(commandData, index.incrementAndGet())); } public List execute() { @@ -278,15 +287,18 @@ public class CommandBatchService extends CommandReactiveService { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); + entry.clearErrors(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); + entry.clearErrors(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisLoadingException) { + entry.clearErrors(); execute(entry, source, mainPromise, slots, attempt); return; }