@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.ArrayList ;
import java.util.Iterator ;
import java.util.List ;
import java.util.concurrent.ExecutorService ;
import org.redisson.client.RedisAskException ;
import org.redisson.client.RedisException ;
@ -43,10 +44,14 @@ import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisOutOfMemoryException ;
import org.redisson.client.RedisTimeoutException ;
import org.redisson.client.RedisTryAgainException ;
import org.redisson.client.codec.BaseCodec ;
import org.redisson.client.codec.ByteArrayCodec ;
import org.redisson.client.codec.Codec ;
import org.redisson.client.codec.StringCodec ;
import org.redisson.client.protocol.CommandData ;
import org.redisson.client.protocol.CommandsData ;
import org.redisson.client.protocol.Decoder ;
import org.redisson.client.protocol.Encoder ;
import org.redisson.client.protocol.QueueCommand ;
import org.redisson.client.protocol.RedisCommand ;
import org.redisson.client.protocol.RedisCommand.ValueType ;
@ -58,6 +63,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory ;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.ByteBufAllocator ;
import io.netty.channel.Channel ;
import io.netty.channel.ChannelHandlerContext ;
import io.netty.handler.codec.ReplayingDecoder ;
@ -70,131 +76,174 @@ import io.netty.util.CharsetUtil;
*
* /
public class CommandDecoder extends ReplayingDecoder < State > {
public static class NullCodec extends BaseCodec {
public static final NullCodec INSTANCE = new NullCodec ( ) ;
private final Decoder < Object > decoder = new Decoder < Object > ( ) {
@Override
public Object decode ( ByteBuf buf , State state ) {
return new Object ( ) ;
}
} ;
@Override
public Decoder < Object > getValueDecoder ( ) {
return decoder ;
}
@Override
public Encoder getValueEncoder ( ) {
throw new UnsupportedOperationException ( ) ;
}
}
protected final Logger log = LoggerFactory . getLogger ( getClass ( ) ) ;
private static final char CR = '\r' ;
private static final char LF = '\n' ;
private static final char ZERO = '0' ;
public enum Status { NORMAL , FILL_BUFFER , DECODE_BUFFER }
final ExecutorService executor ;
private final boolean decodeInExecutor ;
private final ThreadLocal < Status > decoderStatus = new ThreadLocal < Status > ( ) {
@Override
protected Status initialValue ( ) {
return Status . NORMAL ;
} ;
} ;
private final ThreadLocal < State > state = new ThreadLocal < State > ( ) ;
public CommandDecoder ( ExecutorService executor , boolean decodeInExecutor ) {
this . decodeInExecutor = decodeInExecutor ;
this . executor = executor ;
}
@Override
protected void decode ( ChannelHandlerContext ctx , ByteBuf in , List < Object > out ) throws Exception {
QueueCommand data = ctx . channel ( ) . attr ( CommandsQueue . CURRENT_COMMAND ) . get ( ) ;
final QueueCommand data = ctx . channel ( ) . attr ( CommandsQueue . CURRENT_COMMAND ) . get ( ) ;
if ( log . isTraceEnabled ( ) ) {
log . trace ( "reply: {}, channel: {}, command: {}" , in . toString ( 0 , in . writerIndex ( ) , CharsetUtil . UTF_8 ) , ctx . channel ( ) , data ) ;
}
if ( state ( ) = = null ) {
boolean makeCheckpoint = false ;
// commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest
//
// boolean makeCheckpoint = data != null;
// if (data != null) {
// if (data instanceof CommandsData) {
// makeCheckpoint = false;
// } else {
// CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
// MultiDecoder<Object> decoder = cmd.getCommand().getReplayMultiDecoder();
// if (decoder != null
// && (decoder instanceof SlotsDecoder
// || decoder instanceof ListMultiDecoder)) {
// makeCheckpoint = false;
// }
// }
// }
state ( new State ( makeCheckpoint ) ) ;
if ( state . get ( ) = = null ) {
state . set ( new State ( ) ) ;
}
state () . setDecoderState ( null ) ;
state . get ( ) . setDecoderState ( null ) ;
decodeCommand ( ctx , in , data ) ;
in . markReaderIndex ( ) ;
decodeCommand ( ctx . channel ( ) , in , data ) ;
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
in . resetReaderIndex ( ) ;
final ByteBuf copy = ByteBufAllocator . DEFAULT . buffer ( in . writerIndex ( ) ) ;
in . readBytes ( copy ) ;
state . set ( null ) ;
decoderStatus . set ( Status . NORMAL ) ;
final Channel channel = ctx . channel ( ) ;
executor . execute ( new Runnable ( ) {
@Override
public void run ( ) {
decoderStatus . set ( Status . DECODE_BUFFER ) ;
state . set ( new State ( ) ) ;
state . get ( ) . setDecoderState ( null ) ;
try {
decodeCommand ( channel , copy , data ) ;
} catch ( Exception e ) {
log . error ( "Unable to decode data in separate thread: " + LogHelper . toString ( data ) , e ) ;
} finally {
copy . release ( ) ;
decoderStatus . remove ( ) ;
state . remove ( ) ;
}
}
} ) ;
}
}
protected void sendNext ( ChannelHandlerContext ctx , QueueCommand data ) {
protected void sendNext ( Channel channel , QueueCommand data ) {
if ( data ! = null ) {
if ( data . isExecuted ( ) ) {
sendNext ( ctx ) ;
if ( d ecoderStatus. get ( ) = = Status . FILL_BUFFER | | d ata. isExecuted ( ) ) {
sendNext ( c hannel ) ;
}
} else {
sendNext ( ctx ) ;
sendNext ( c hannel ) ;
}
}
protected void decodeCommand ( ChannelHandlerContext ctx , ByteBuf in , QueueCommand data ) throws Exception {
protected void decodeCommand ( Channel channel , ByteBuf in , QueueCommand data ) throws Exception {
if ( data instanceof CommandData ) {
CommandData < Object , Object > cmd = ( CommandData < Object , Object > ) data ;
try {
if ( state ( ) . isMakeCheckpoint ( ) ) {
decodeFromCheckpoint ( ctx , in , data , cmd ) ;
} else {
decode ( in , cmd , null , ctx , false ) ;
}
sendNext ( ctx , data ) ;
decode ( in , cmd , null , channel , false , null ) ;
sendNext ( channel , data ) ;
} catch ( Exception e ) {
log . error ( "Unable to decode data. channel: " + ctx . channel ( ) + ", reply: " + LogHelper . toString ( in ) + ", command: " + LogHelper . toString ( data ) , e ) ;
log . error ( "Unable to decode data. channel: " + channel + ", reply: " + LogHelper . toString ( in ) + ", command: " + LogHelper . toString ( data ) , e ) ;
cmd . tryFailure ( e ) ;
sendNext ( ctx ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
} else if ( data instanceof CommandsData ) {
CommandsData commands = ( CommandsData ) data ;
try {
decodeCommandBatch ( c tx , in , data , commands ) ;
decodeCommandBatch ( c hannel , in , data , commands ) ;
} catch ( Exception e ) {
commands . getPromise ( ) . tryFailure ( e ) ;
sendNext ( ctx ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
} else {
try {
while ( in . writerIndex ( ) > in . readerIndex ( ) ) {
decode ( in , null , null , c tx, false ) ;
decode ( in , null , null , c hannel, false , null ) ;
}
sendNext ( c tx ) ;
sendNext ( c hannel ) ;
} catch ( Exception e ) {
log . error ( "Unable to decode data. channel: " + ctx . channel ( ) + ", reply: " + LogHelper . toString ( in ) , e ) ;
sendNext ( ctx ) ;
log . error ( "Unable to decode data. channel: " + channel + ", reply: " + LogHelper . toString ( in ) , e ) ;
decoderStatus . set ( Status . NORMAL ) ;
sendNext ( channel ) ;
throw e ;
}
}
}
protected void sendNext ( ChannelHandlerContext ctx ) {
ctx . pipeline ( ) . get ( CommandsQueue . class ) . sendNextCommand ( ctx . channel ( ) ) ;
state ( null ) ;
}
protected void decodeFromCheckpoint ( ChannelHandlerContext ctx , ByteBuf in , QueueCommand data ,
CommandData < Object , Object > cmd ) throws IOException {
StateLevel level = state ( ) . getLastLevel ( ) ;
List < Object > prevParts = null ;
if ( state ( ) . getLevels ( ) . size ( ) > 1 ) {
StateLevel prevLevel = state ( ) . getLevels ( ) . get ( state ( ) . getLevel ( ) - 1 ) ;
prevParts = prevLevel . getParts ( ) ;
}
decodeList ( in , cmd , prevParts , ctx , level . getSize ( ) , level . getParts ( ) , false ) ;
if ( state ( ) . getLastLevel ( ) = = level ) {
state ( ) . removeLastLevel ( ) ;
protected void sendNext ( Channel channel ) {
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
channel . pipeline ( ) . get ( CommandsQueue . class ) . sendNextCommand ( channel ) ;
state . set ( null ) ;
}
}
ThreadLocal < List < CommandData < ? , ? > > > commandsData = new ThreadLocal < List < CommandData < ? , ? > > > ( ) ;
private void decodeCommandBatch ( Channel HandlerContext ctx , ByteBuf in , QueueCommand data ,
private void decodeCommandBatch ( Channel channel , ByteBuf in , QueueCommand data ,
CommandsData commandBatch ) throws Exception {
int i = state ( ) . getBatchIndex ( ) ;
int i = 0 ;
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
i = state . get ( ) . getBatchIndex ( ) ;
}
Throwable error = null ;
while ( in . writerIndex ( ) > in . readerIndex ( ) ) {
CommandData < Object , Object > commandData = null ;
try {
checkpoint ( ) ;
state ( ) . setBatchIndex ( i ) ;
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
checkpoint ( ) ;
state . get ( ) . setBatchIndex ( i ) ;
}
RedisCommand < ? > cmd = commandBatch . getCommands ( ) . get ( i ) . getCommand ( ) ;
boolean skipConvertor = commandBatch . isQueued ( ) ;
List < CommandData < ? , ? > > commandsData = null ;
if ( ! commandBatch . isAtomic ( )
| | RedisCommands . EXEC . getName ( ) . equals ( cmd . getName ( ) )
| | RedisCommands . WAIT . getName ( ) . equals ( cmd . getName ( ) ) ) {
@ -202,20 +251,14 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if ( RedisCommands . EXEC . getName ( ) . equals ( cmd . getName ( ) ) ) {
skipConvertor = false ;
if ( commandBatch . getAttachedCommands ( ) ! = null ) {
commandsData . set ( commandBatch . getAttachedCommands ( ) ) ;
commandsData = commandBatch . getAttachedCommands ( ) ;
} else {
commandsData . set ( commandBatch . getCommands ( ) ) ;
commandsData = commandBatch . getCommands ( ) ;
}
}
}
try {
decode ( in , commandData , null , ctx , skipConvertor ) ;
} finally {
if ( commandData ! = null & & RedisCommands . EXEC . getName ( ) . equals ( commandData . getCommand ( ) . getName ( ) ) ) {
commandsData . remove ( ) ;
}
}
decode ( in , commandData , null , channel , skipConvertor , commandsData ) ;
if ( commandData ! = null & & RedisCommands . EXEC . getName ( ) . equals ( commandData . getCommand ( ) . getName ( ) )
& & commandData . getPromise ( ) . isSuccess ( ) ) {
@ -229,7 +272,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
Object res = iter . next ( ) ;
completeResponse ( ( CommandData < Object , Object > ) command , res , c tx. c hannel( ) ) ;
completeResponse ( ( CommandData < Object , Object > ) command , res , c hannel) ;
}
if ( RedisCommands . MULTI . getName ( ) . equals ( command . getCommand ( ) . getName ( ) ) ) {
@ -250,27 +293,30 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
if ( commandBatch . isSkipResult ( ) | | i = = commandBatch . getCommands ( ) . size ( ) ) {
RPromise < Void > promise = commandBatch . getPromise ( ) ;
if ( error ! = null ) {
if ( ! promise . tryFailure ( error ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , ctx . channel ( ) , LogHelper . toString ( data ) ) ;
}
} else {
if ( ! promise . trySuccess ( null ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , ctx . channel ( ) , LogHelper . toString ( data ) ) ;
if ( decoderStatus . get ( ) ! = Status . FILL_BUFFER ) {
RPromise < Void > promise = commandBatch . getPromise ( ) ;
if ( error ! = null ) {
if ( ! promise . tryFailure ( error ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
} else {
if ( ! promise . trySuccess ( null ) & & promise . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}" , channel , LogHelper . toString ( data ) ) ;
}
}
}
sendNext ( c tx ) ;
sendNext ( c hannel ) ;
} else {
checkpoint ( ) ;
state ( ) . setBatchIndex ( i ) ;
if ( decoderStatus . get ( ) ! = Status . DECODE_BUFFER ) {
checkpoint ( ) ;
state . get ( ) . setBatchIndex ( i ) ;
}
}
}
protected void decode ( ByteBuf in , CommandData < Object , Object > data , List < Object > parts , Channel HandlerContext ctx , boolean skipConvertor ) throws IOException {
protected void decode ( ByteBuf in , CommandData < Object , Object > data , List < Object > parts , Channel channel , boolean skipConvertor , List < CommandData < ? , ? > > commandsData ) throws IOException {
int code = in . readByte ( ) ;
Channel channel = ctx . channel ( ) ;
if ( code = = '+' ) {
String result = readString ( in ) ;
@ -315,30 +361,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Object result = null ;
if ( buf ! = null ) {
Decoder < Object > decoder = selectDecoder ( data , parts ) ;
result = decoder . decode ( buf , state () ) ;
result = decoder . decode ( buf , state .get () ) ;
}
handleResult ( data , parts , result , false , channel ) ;
} else if ( code = = '*' ) {
long size = readLong ( in ) ;
final List < Object > respParts = new ArrayList < Object > ( Math . max ( ( int ) size , 0 ) ) ;
List < Object > respParts = new ArrayList < Object > ( Math . max ( ( int ) size , 0 ) ) ;
StateLevel lastLevel = null ;
if ( state ( ) . isMakeCheckpoint ( ) ) {
lastLevel = new StateLevel ( size , respParts ) ;
state ( ) . addLevel ( lastLevel ) ;
}
state ( ) . incLevel ( ) ;
state . get ( ) . incLevel ( ) ;
decodeList ( in , data , parts , c tx , size , respParts , skipConvertor ) ;
decodeList ( in , data , parts , channel , size , respParts , skipConvertor , commandsData ) ;
state ( ) . decLevel ( ) ;
if ( state ( ) . isMakeCheckpoint ( ) ) {
if ( lastLevel = = state ( ) . getLastLevel ( ) & & lastLevel . isFull ( ) ) {
state ( ) . removeLastLevel ( ) ;
}
}
state . get ( ) . decLevel ( ) ;
} else {
String dataStr = in . toString ( 0 , in . writerIndex ( ) , CharsetUtil . UTF_8 ) ;
@ -355,31 +389,23 @@ public class CommandDecoder extends ReplayingDecoder<State> {
@SuppressWarnings ( "unchecked" )
private void decodeList ( ByteBuf in , CommandData < Object , Object > data , List < Object > parts ,
Channel HandlerContext ctx , long size , List < Object > respParts , boolean skipConvertor )
Channel channel , long size , List < Object > respParts , boolean skipConvertor , List < CommandData < ? , ? > > commandsData )
throws IOException {
if ( parts = = null & & commandsData . get ( ) ! = null ) {
List < CommandData < ? , ? > > commands = commandsData . get ( ) ;
if ( parts = = null & & commandsData ! = null ) {
for ( int i = respParts . size ( ) ; i < size ; i + + ) {
int suffix = 0 ;
if ( RedisCommands . MULTI . getName ( ) . equals ( commands . get ( 0 ) . getCommand ( ) . getName ( ) ) ) {
if ( RedisCommands . MULTI . getName ( ) . equals ( commands Data . get ( 0 ) . getCommand ( ) . getName ( ) ) ) {
suffix = 1 ;
}
CommandData < Object , Object > commandData = ( CommandData < Object , Object > ) commands . get ( i + suffix ) ;
decode ( in , commandData , respParts , c tx, skipConvertor ) ;
CommandData < Object , Object > commandData = ( CommandData < Object , Object > ) commands Data . get ( i + suffix ) ;
decode ( in , commandData , respParts , c hannel, skipConvertor , commandsData ) ;
if ( commandData . getPromise ( ) . isDone ( ) & & ! commandData . getPromise ( ) . isSuccess ( ) ) {
data . tryFailure ( commandData . cause ( ) ) ;
}
if ( state ( ) . isMakeCheckpoint ( ) ) {
checkpoint ( ) ;
}
}
} else {
for ( int i = respParts . size ( ) ; i < size ; i + + ) {
decode ( in , data , respParts , ctx , skipConvertor ) ;
if ( state ( ) . isMakeCheckpoint ( ) ) {
checkpoint ( ) ;
}
decode ( in , data , respParts , channel , skipConvertor , null ) ;
}
}
@ -388,19 +414,23 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return ;
}
Object result = decoder . decode ( respParts , state ( ) ) ;
decodeResult ( data , parts , ctx , result ) ;
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
return ;
}
Object result = decoder . decode ( respParts , state . get ( ) ) ;
decodeResult ( data , parts , channel , result ) ;
}
protected void decodeResult ( CommandData < Object , Object > data , List < Object > parts , ChannelHandlerContext ctx ,
protected void decodeResult ( CommandData < Object , Object > data , List < Object > parts , Channel channel ,
Object result ) throws IOException {
if ( data ! = null ) {
handleResult ( data , parts , result , true , c tx. c hannel( ) ) ;
handleResult ( data , parts , result , true , c hannel) ;
}
}
private void handleResult ( CommandData < Object , Object > data , List < Object > parts , Object result , boolean skipConvertor , Channel channel ) {
if ( data ! = null & & ! skipConvertor ) {
if ( data ! = null & & ! skipConvertor & & decoderStatus . get ( ) ! = Status . FILL_BUFFER ) {
result = data . getCommand ( ) . getConvertor ( ) . convert ( result ) ;
}
if ( parts ! = null ) {
@ -411,6 +441,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
protected void completeResponse ( CommandData < Object , Object > data , Object result , Channel channel ) {
if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
return ;
}
if ( data ! = null & & ! data . getPromise ( ) . trySuccess ( result ) & & data . cause ( ) instanceof RedisTimeoutException ) {
log . warn ( "response has been skipped due to timeout! channel: {}, command: {}, result: {}" , channel , LogHelper . toString ( data ) , LogHelper . toString ( result ) ) ;
}
@ -433,36 +467,46 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if ( parts ! = null ) {
MultiDecoder < Object > multiDecoder = data . getCommand ( ) . getReplayMultiDecoder ( ) ;
if ( multiDecoder ! = null ) {
Decoder < Object > mDecoder = multiDecoder . getDecoder ( parts . size ( ) , state () ) ;
Decoder < Object > mDecoder = multiDecoder . getDecoder ( parts . size ( ) , state .get () ) ;
if ( mDecoder ! = null ) {
return mDecoder ;
}
}
}
Codec codec = data . getCodec ( ) ;
if ( decodeInExecutor & & ! ( codec instanceof StringCodec | | codec instanceof ByteArrayCodec ) ) {
if ( decoderStatus . get ( ) = = Status . NORMAL ) {
decoderStatus . set ( Status . FILL_BUFFER ) ;
codec = NullCodec . INSTANCE ;
} else if ( decoderStatus . get ( ) = = Status . FILL_BUFFER ) {
codec = NullCodec . INSTANCE ;
}
}
Decoder < Object > decoder = data . getCommand ( ) . getReplayDecoder ( ) ;
if ( decoder = = null ) {
if ( data . getCodec ( ) = = null ) {
if ( codec = = null ) {
return StringCodec . INSTANCE . getValueDecoder ( ) ;
}
if ( data . getCommand ( ) . getOutParamType ( ) = = ValueType . MAP ) {
if ( parts ! = null & & parts . size ( ) % 2 ! = 0 ) {
return data . getCodec ( ) . getMapValueDecoder ( ) ;
return codec . getMapValueDecoder ( ) ;
} else {
return data . getCodec ( ) . getMapKeyDecoder ( ) ;
return codec . getMapKeyDecoder ( ) ;
}
} else if ( data . getCommand ( ) . getOutParamType ( ) = = ValueType . MAP_KEY ) {
return data. getCodec ( ) . getMapKeyDecoder ( ) ;
return codec . getMapKeyDecoder ( ) ;
} else if ( data . getCommand ( ) . getOutParamType ( ) = = ValueType . MAP_VALUE ) {
return data. getCodec ( ) . getMapValueDecoder ( ) ;
return codec . getMapValueDecoder ( ) ;
} else {
return data. getCodec ( ) . getValueDecoder ( ) ;
return codec . getValueDecoder ( ) ;
}
}
return decoder ;
}
p ublic ByteBuf readBytes ( ByteBuf is ) throws IOException {
p rivate ByteBuf readBytes ( ByteBuf is ) throws IOException {
long l = readLong ( is ) ;
if ( l > Integer . MAX_VALUE ) {
throw new IllegalArgumentException (
@ -481,7 +525,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return buffer ;
}
p ublic static long readLong ( ByteBuf is ) throws IOException {
p rivate long readLong ( ByteBuf is ) throws IOException {
long size = 0 ;
int sign = 1 ;
int read = is . readByte ( ) ;