TRYAGAIN error handling. #612

pull/614/head
Nikita 9 years ago
parent 8aac5ce463
commit e4909f16d2

@ -0,0 +1,31 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisTryAgainException extends RedisException {
private static final long serialVersionUID = -2565335188503354660L;
public RedisTryAgainException(String message) {
super(message);
}
}

@ -29,6 +29,7 @@ import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
@ -240,6 +241,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.tryFailure(new RedisAskException(slot, addr));
} else if (error.startsWith("TRYAGAIN")) {
data.tryFailure(new RedisTryAgainException(error
+ ". channel: " + channel + " data: " + data));
} else if (error.startsWith("LOADING")) {
data.tryFailure(new RedisLoadingException(error
+ ". channel: " + channel + " data: " + data));

@ -37,6 +37,7 @@ import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
@ -741,6 +742,19 @@ public class CommandAsyncService implements CommandAsyncExecutor {
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
}
}, 1, TimeUnit.SECONDS);
AsyncDetails.release(details);
return;
}
if (future.isSuccess()) {
R res = future.getNow();

@ -30,6 +30,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
@ -299,6 +300,17 @@ public class CommandBatchService extends CommandReactiveService {
execute(entry, source, mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
entry.clearErrors();
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt);
}
}, 1, TimeUnit.SECONDS);
return;
}
if (future.isSuccess()) {
if (slots.decrementAndGet() == 0) {

Loading…
Cancel
Save