Pipeline support for Spring Data Redis integration. #1373

pull/1547/head
Nikita 7 years ago
parent d5e0e6e97e
commit a489a82f5d

@ -22,10 +22,12 @@ import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
@ -149,7 +151,15 @@ public class RedissonConnection extends AbstractRedisConnection {
public boolean isPipelined() {
if (executorService instanceof CommandBatchService) {
CommandBatchService es = (CommandBatchService) executorService;
return es.getOptions().getExecutionMode() == ExecutionMode.IN_MEMORY;
return es.getOptions().getExecutionMode() == ExecutionMode.IN_MEMORY || es.getOptions().getExecutionMode() == ExecutionMode.IN_MEMORY_ATOMIC;
}
return false;
}
public boolean isPipelinedAtomic() {
if (executorService instanceof CommandBatchService) {
CommandBatchService es = (CommandBatchService) executorService;
return es.getOptions().getExecutionMode() == ExecutionMode.IN_MEMORY_ATOMIC;
}
return false;
}
@ -167,13 +177,18 @@ public class RedissonConnection extends AbstractRedisConnection {
CommandBatchService es = (CommandBatchService) executorService;
try {
BatchResult<?> result = es.execute();
filterResults(result);
if (isPipelinedAtomic()) {
return Arrays.<Object>asList((List<Object>) result.getResponses());
}
return (List<Object>) result.getResponses();
} catch (Exception ex) {
throw new RedisPipelineException(ex);
} finally {
executorService = (CommandAsyncService) this.redisson.getCommandExecutor();
resetConnection();
}
} else {
throw new InvalidDataAccessApiUsageException("Not in pipeline mode. Please invoke multi method");
}
return Collections.emptyList();
}
@Override
@ -187,6 +202,10 @@ public class RedissonConnection extends AbstractRedisConnection {
}
return t;
} catch (IllegalArgumentException e) {
if (isPipelined()) {
throw new RedisPipelineException(e);
}
throw new InvalidDataAccessApiUsageException(e.getMessage(), e);
}
}
@ -306,10 +325,14 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, TYPE, key);
}
private final RedisStrictCommand<Set<byte[]>> KEYS = new RedisStrictCommand<Set<byte[]>>("KEYS", new SetReplayDecoder<byte[]>(ByteArrayCodec.INSTANCE.getValueDecoder()));
private static final RedisStrictCommand<Set<byte[]>> KEYS = new RedisStrictCommand<Set<byte[]>>("KEYS", new SetReplayDecoder<byte[]>(ByteArrayCodec.INSTANCE.getValueDecoder()));
@Override
public Set<byte[]> keys(byte[] pattern) {
if (isQueueing()) {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, KEYS, pattern));
return sync(f);
@ -366,6 +389,10 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public byte[] randomKey() {
if (isQueueing()) {
return read(null, ByteArrayCodec.INSTANCE, RedisCommands.RANDOM_KEY);
}
RFuture<byte[]> f = executorService.readRandomAsync(ByteArrayCodec.INSTANCE, RedisCommands.RANDOM_KEY);
return sync(f);
}
@ -781,13 +808,29 @@ public class RedissonConnection extends AbstractRedisConnection {
return write(key, StringCodec.INSTANCE, LINSERT, key, where, pivot, value);
}
private final List<String> commandsToRemove = Arrays.asList("SET",
"RESTORE", "LTRIM", "SETEX", "SETRANGE", "FLUSHDB", "LSET", "MSET", "HMSET", "RENAME");
private final List<Integer> indexToRemove = new ArrayList<Integer>();
private int index = -1;
private <T> T write(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
RFuture<T> f = executorService.writeAsync(key, codec, command, params);
indexCommand(command);
return sync(f);
}
protected void indexCommand(RedisCommand<?> command) {
if (isQueueing() || isPipelined()) {
index++;
if (commandsToRemove.contains(command.getName())) {
indexToRemove.add(index);
}
}
}
private <T> T read(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
RFuture<T> f = executorService.readAsync(key, codec, command, params);
indexCommand(command);
return sync(f);
}
@ -1513,6 +1556,17 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public void multi() {
if (isQueueing()) {
return;
}
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
@ -1520,31 +1574,50 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public List<Object> exec() {
if (isPipelinedAtomic()) {
return null;
}
if (isQueueing()) {
try {
BatchResult<?> result = ((CommandBatchService)executorService).execute();
for (Iterator<Object> iterator = (Iterator<Object>) result.getResponses().iterator(); iterator.hasNext();) {
Object object = iterator.next();
if (object == null) {
iterator.remove();
}
}
filterResults(result);
return (List<Object>) result.getResponses();
} catch (Exception ex) {
throw transform(ex);
} finally {
executorService = (CommandAsyncService) this.redisson.getCommandExecutor();
resetConnection();
}
} else {
throw new InvalidDataAccessApiUsageException("Not in transaction mode. Please invoke multi method");
}
}
protected void filterResults(BatchResult<?> result) {
int t = 0;
for (Integer index : indexToRemove) {
index -= t;
result.getResponses().remove((int)index);
t++;
}
for (ListIterator<Object> iterator = (ListIterator<Object>) result.getResponses().listIterator(); iterator.hasNext();) {
Object object = iterator.next();
if (object instanceof String) {
iterator.set(((String) object).getBytes());
}
}
}
protected void resetConnection() {
executorService = (CommandAsyncService) this.redisson.getCommandExecutor();
index = -1;
indexToRemove.clear();
}
@Override
public void discard() {
if (isQueueing()) {
syncFuture(executorService.writeAsync(null, RedisCommands.DISCARD));
executorService = (CommandAsyncService) this.redisson.getCommandExecutor();
resetConnection();
} else {
throw new InvalidDataAccessApiUsageException("Not in transaction mode. Please invoke multi method");
}
@ -1660,6 +1733,10 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public Long dbSize() {
if (isQueueing()) {
return read(null, StringCodec.INSTANCE, RedisCommands.DBSIZE);
}
RFuture<Long> f = executorService.readAllAsync(RedisCommands.DBSIZE, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
@ -1677,6 +1754,11 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public void flushDb() {
if (isQueueing() || isPipelined()) {
write(null, StringCodec.INSTANCE, RedisCommands.FLUSHDB);
return;
}
RFuture<Void> f = executorService.writeAllAsync(RedisCommands.FLUSHDB);
sync(f);
}
@ -1771,6 +1853,10 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public void scriptFlush() {
if (isQueueing() || isPipelined()) {
throw new UnsupportedOperationException();
}
RFuture<Void> f = executorService.writeAllAsync(RedisCommands.SCRIPT_FLUSH);
sync(f);
}
@ -1782,6 +1868,13 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public String scriptLoad(byte[] script) {
if (isQueueing()) {
throw new UnsupportedOperationException();
}
if (isPipelined()) {
throw new UnsupportedOperationException();
}
RFuture<String> f = executorService.writeAllAsync(StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
@ -1799,6 +1892,10 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public List<Boolean> scriptExists(final String... scriptShas) {
if (isQueueing() || isPipelined()) {
throw new UnsupportedOperationException();
}
RFuture<List<Boolean>> f = executorService.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
List<Boolean> result = new ArrayList<Boolean>(scriptShas.length);
@ -1823,6 +1920,13 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public <T> T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) {
if (isQueueing()) {
throw new UnsupportedOperationException();
}
if (isPipelined()) {
throw new UnsupportedOperationException();
}
RedisCommand<?> c = toCommand(returnType, "EVAL");
List<Object> params = new ArrayList<Object>();
params.add(script);
@ -1851,6 +1955,13 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public <T> T evalSha(String scriptSha, ReturnType returnType, int numKeys, byte[]... keysAndArgs) {
if (isQueueing()) {
throw new UnsupportedOperationException();
}
if (isPipelined()) {
throw new UnsupportedOperationException();
}
RedisCommand<?> c = toCommand(returnType, "EVALSHA");
List<Object> params = new ArrayList<Object>();
params.add(scriptSha);

Loading…
Cancel
Save