@ -44,14 +44,11 @@ import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisOutOfMemoryException ;
import org.redisson.client.RedisTimeoutException ;
import org.redisson.client.RedisTryAgainException ;
import org.redisson.client.codec.BaseCodec ;
import org.redisson.client.codec.ByteArrayCodec ;
import org.redisson.client.codec.Codec ;
import org.redisson.client.codec.StringCodec ;
import org.redisson.client.protocol.CommandData ;
import org.redisson.client.protocol.CommandsData ;
import org.redisson.client.protocol.Decoder ;
import org.redisson.client.protocol.Encoder ;
import org.redisson.client.protocol.QueueCommand ;
import org.redisson.client.protocol.RedisCommand ;
import org.redisson.client.protocol.RedisCommand.ValueType ;
@ -63,7 +60,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory ;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.ByteBufAllocator ;
import io.netty.channel.Channel ;
import io.netty.channel.ChannelHandlerContext ;
import io.netty.handler.codec.ReplayingDecoder ;
@ -78,30 +74,6 @@ import io.netty.util.concurrent.FastThreadLocal;
* /
public class CommandDecoder extends ReplayingDecoder < State > {
public static class NullCodec extends BaseCodec {
public static final NullCodec INSTANCE = new NullCodec ( ) ;
private final Decoder < Object > decoder = new Decoder < Object > ( ) {
@Override
public Object decode ( ByteBuf buf , State state ) {
return new Object ( ) ;
}
} ;
@Override
public Decoder < Object > getValueDecoder ( ) {
return decoder ;
}
@Override
public Encoder getValueEncoder ( ) {
throw new UnsupportedOperationException ( ) ;
}
}
final Logger log = LoggerFactory . getLogger ( getClass ( ) ) ;
private static final char CR = '\r' ;
@ -113,13 +85,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
final ExecutorService executor ;
private final boolean decodeInExecutor ;
private final FastThreadLocal < Status > decoderStatus = new FastThreadLocal < Status > ( ) {
@Override
protected Status initialValue ( ) {
return Status . NORMAL ;
} ;
} ;
private final FastThreadLocal < State > state = new FastThreadLocal < State > ( ) ;
public CommandDecoder ( ExecutorService executor , boolean decodeInExecutor ) {
@ -128,57 +93,116 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
@Override
protected void decode ( ChannelHandlerContext ctx , ByteBuf in , List < Object > out ) throws Exception {
final QueueCommand data = ctx . channel ( ) . attr ( CommandsQueue . CURRENT_COMMAND ) . get ( ) ;
protected final void decode ( ChannelHandlerContext ctx , ByteBuf in , List < Object > out ) throws Exception {
QueueCommand data = ctx . channel ( ) . attr ( CommandsQueue . CURRENT_COMMAND ) . get ( ) ;
if ( log . isTraceEnabled ( ) ) {
log . trace ( "reply: {}, channel: {}, command: {}" , in . toString ( 0 , in . writerIndex ( ) , CharsetUtil . UTF_8 ) , ctx . channel ( ) , data ) ;
}
if ( state . get ( ) = = null ) {
state . set ( new State ( ) ) ;
}
state . get ( ) . setDecoderState ( null ) ;
in . markReaderIndex ( ) ;
decodeCommand ( ctx . channel ( ) , in , data ) ;
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
if ( data = = null ) {
while ( in . writerIndex ( ) > in . readerIndex ( ) ) {
in . markReaderIndex ( ) ;
skipCommand ( in ) ;
in . resetReaderIndex ( ) ;
decode ( ctx , in , data ) ;
}
} else {
in . markReaderIndex ( ) ;
if ( data instanceof CommandsData ) {
CommandsData cmd = ( CommandsData ) data ;
if ( ! cmd . isSkipResult ( ) ) {
for ( int j = 0 ; j < cmd . getCommands ( ) . size ( ) ; j + + ) {
skipCommand ( in ) ;
}
}
} else {
skipCommand ( in ) ;
}
in . resetReaderIndex ( ) ;
final ByteBuf copy = ByteBufAllocator . DEFAULT . buffer ( in . writerIndex ( ) ) ;
in . readBytes ( copy ) ;
state . set ( null ) ;
decoderStatus . set ( Status . NORMAL ) ;
final Channel channel = ctx . channel ( ) ;
decode ( ctx , in , data ) ;
}
}
private void decode ( ChannelHandlerContext ctx , ByteBuf in , QueueCommand data ) throws Exception {
if ( decodeInExecutor ) {
ByteBuf copy = in . copy ( in . readerIndex ( ) , in . writerIndex ( ) - in . readerIndex ( ) ) ;
in . skipBytes ( in . writerIndex ( ) - in . readerIndex ( ) ) ;
executor . execute ( ( ) - > {
decoderStatus . set ( Status . DECODE_BUFFER ) ;
state . set ( new State ( ) ) ;
state . get ( ) . setDecoderState ( null ) ;
try {
decodeCommand ( channel , copy , data ) ;
decodeCommand ( c tx. c hannel( ) , copy , data ) ;
} catch ( Exception e ) {
log . error ( "Unable to decode data in separate thread: " + LogHelper . toString ( data ) , e ) ;
} finally {
copy . release ( ) ;
decoderStatus . remove ( ) ;
state . remove ( ) ;
}
} ) ;
} else {
decodeCommand ( ctx . channel ( ) , in , data ) ;
}
}
protected void sendNext ( Channel channel , QueueCommand data ) {
if ( data ! = null ) {
if ( d ecoderStatus. get ( ) = = Status . FILL_BUFFER | | d ata. isExecuted ( ) ) {
if ( d ata. isExecuted ( ) ) {
sendNext ( channel ) ;
}
} else {
sendNext ( channel ) ;
}
}
protected void skipCommand ( ByteBuf in ) throws Exception {
skipDecode ( in ) ;
}
protected void skipDecode ( ByteBuf in ) throws IOException {
int code = in . readByte ( ) ;
if ( code = = '+' ) {
skipString ( in ) ;
} else if ( code = = '-' ) {
skipString ( in ) ;
} else if ( code = = ':' ) {
skipString ( in ) ;
} else if ( code = = '$' ) {
skipBytes ( in ) ;
} else if ( code = = '*' ) {
long size = readLong ( in ) ;
for ( int i = 0 ; i < size ; i + + ) {
skipDecode ( in ) ;
}
}
}
private void skipBytes ( ByteBuf is ) throws IOException {
long l = readLong ( is ) ;
if ( l > Integer . MAX_VALUE ) {
throw new IllegalArgumentException (
"Java only supports arrays up to " + Integer . MAX_VALUE + " in size" ) ;
}
int size = ( int ) l ;
if ( size = = - 1 ) {
return ;
}
is . skipBytes ( size + 2 ) ;
}
private void skipString ( ByteBuf in ) {
int len = in . bytesBefore ( ( byte ) '\r' ) ;
in . skipBytes ( len + 2 ) ;
}
protected void decodeCommand ( Channel channel , ByteBuf in , QueueCommand data ) throws Exception {
if ( data instanceof CommandData ) {
CommandData < Object , Object > cmd = ( CommandData < Object , Object > ) data ;
@ -188,7 +212,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} catch ( Exception e ) {
log . error ( "Unable to decode data. channel: " + channel + ", reply: " + LogHelper . toString ( in ) + ", command: " + LogHelper . toString ( data ) , e ) ;
cmd . tryFailure ( e ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
@ -198,7 +221,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decodeCommandBatch ( channel , in , data , commands ) ;
} catch ( Exception e ) {
commands . getPromise ( ) . tryFailure ( e ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
@ -210,7 +232,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
sendNext ( channel ) ;
} catch ( Exception e ) {
log . error ( "Unable to decode data. channel: " + channel + ", reply: " + LogHelper . toString ( in ) , e ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
@ -218,27 +239,19 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
protected void sendNext ( Channel channel ) {
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
channel . pipeline ( ) . get ( CommandsQueue . class ) . sendNextCommand ( channel ) ;
state . set ( null ) ;
}
channel . pipeline ( ) . get ( CommandsQueue . class ) . sendNextCommand ( channel ) ;
state . set ( null ) ;
}
private void decodeCommandBatch ( Channel channel , ByteBuf in , QueueCommand data ,
CommandsData commandBatch ) throws Exception {
int i = 0 ;
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
i = state . get ( ) . getBatchIndex ( ) ;
}
int i = state . get ( ) . getBatchIndex ( ) ;
Throwable error = null ;
while ( in . writerIndex ( ) > in . readerIndex ( ) ) {
CommandData < Object , Object > commandData = null ;
try {
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
checkpoint ( ) ;
state . get ( ) . setBatchIndex ( i ) ;
}
state . get ( ) . setBatchIndex ( i ) ;
RedisCommand < ? > cmd = commandBatch . getCommands ( ) . get ( i ) . getCommand ( ) ;
boolean skipConvertor = commandBatch . isQueued ( ) ;
List < CommandData < ? , ? > > commandsData = null ;
@ -291,25 +304,20 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
if ( commandBatch . isSkipResult ( ) | | i = = commandBatch . getCommands ( ) . size ( ) ) {
if ( decoderStatus . get ( ) ! = Status . FILL_BUFFER ) {
RPromise < Void > promise = commandBatch . getPromise ( ) ;
if ( error ! = null ) {
if ( ! promise . tryFailure ( error ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
} else {
if ( ! promise . trySuccess ( null ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
RPromise < Void > promise = commandBatch . getPromise ( ) ;
if ( error ! = null ) {
if ( ! promise . tryFailure ( error ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
} else {
if ( ! promise . trySuccess ( null ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
}
sendNext ( channel ) ;
} else {
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
checkpoint ( ) ;
state . get ( ) . setBatchIndex ( i ) ;
}
state . get ( ) . setBatchIndex ( i ) ;
}
}
@ -384,7 +392,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
in . skipBytes ( len + 2 ) ;
return result ;
}
@SuppressWarnings ( "unchecked" )
private void decodeList ( ByteBuf in , CommandData < Object , Object > data , List < Object > parts ,
Channel channel , long size , List < Object > respParts , boolean skipConvertor , List < CommandData < ? , ? > > commandsData )
@ -412,10 +420,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return ;
}
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
return ;
}
Object result = decoder . decode ( respParts , state . get ( ) ) ;
decodeResult ( data , parts , channel , result ) ;
}
@ -428,7 +432,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
private void handleResult ( CommandData < Object , Object > data , List < Object > parts , Object result , boolean skipConvertor , Channel channel ) {
if ( data ! = null & & ! skipConvertor & & decoderStatus . get ( ) ! = Status . FILL_BUFFER ) {
if ( data ! = null & & ! skipConvertor ) {
result = data . getCommand ( ) . getConvertor ( ) . convert ( result ) ;
}
if ( parts ! = null ) {
@ -439,10 +443,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
protected void completeResponse ( CommandData < Object , Object > data , Object result , Channel channel ) {
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
return ;
}
if ( data ! = null & & ! data . getPromise ( ) . trySuccess ( result ) & & data . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}, result: {}" , channel , LogHelper . toString ( data ) , LogHelper . toString ( result ) ) ;
}
@ -473,15 +473,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
Codec codec = data . getCodec ( ) ;
if ( decodeInExecutor & & ! ( codec instanceof StringCodec | | codec instanceof ByteArrayCodec ) ) {
if ( decoderStatus . get ( ) = = Status . NORMAL ) {
decoderStatus . set ( Status . FILL_BUFFER ) ;
codec = NullCodec . INSTANCE ;
} else if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
codec = NullCodec . INSTANCE ;
}
}
Decoder < Object > decoder = data . getCommand ( ) . getReplayDecoder ( ) ;
if ( decoder = = null ) {
if ( codec = = null ) {