Fixed - Spring Data RedissonConnection#del() doesn’t participate in pipeline. #2680

pull/2691/head
Nikita Koksharov 5 years ago
parent faebedbeab
commit 8be90089a3

@ -61,6 +61,7 @@
<module>redisson-tomcat</module>
<module>redisson-spring-data</module>
<module>redisson-spring-boot-starter</module>
<module>redisson-spring-cloud-connector</module>
<module>redisson-mybatis</module>
<module>redisson-hibernate</module>
</modules>

@ -200,78 +200,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -230,78 +230,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -236,78 +236,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -203,78 +203,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -233,78 +233,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -225,78 +225,9 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key);
}
private void checkExecution(final RPromise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
if (!executorService.getConnectionManager().isClusterMode()) {
return executorService.writeAsync(null, command, Arrays.asList(keys).toArray());
}
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<MasterSlaveEntry, List<byte[]>>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<byte[]>();
range2key.put(entry, list);
}
list.add(key);
}
final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = new BiConsumer<BatchResult<?>, Throwable>() {
@Override
public void accept(BatchResult<?> r, Throwable u) {
if (u == null) {
List<Long> result = (List<Long>) r.getResponses();
for (Long res : result) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
}
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray());
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new DataTypeConvertor());

@ -9,6 +9,24 @@ import org.redisson.BaseTest;
public class RedissonPipelineConnectionTest extends BaseConnectionTest {
@Test
public void testDel() {
RedissonConnection connection = new RedissonConnection(redisson);
byte[] key = "my_key".getBytes();
byte[] value = "my_value".getBytes();
connection.set(key, value);
connection.openPipeline();
connection.get(key);
connection.del(key);
List<Object> results = connection.closePipeline();
byte[] val = (byte[])results.get(0);
assertThat(val).isEqualTo(value);
Long res = (Long) results.get(1);
assertThat(res).isEqualTo(1);
}
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);

Loading…
Cancel
Save