From 8be90089a36e9aab31121bb0771d25f82d8a65a7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 9 Apr 2020 14:46:07 +0300 Subject: [PATCH] =?UTF-8?q?Fixed=20-=20Spring=20Data=20RedissonConnection#?= =?UTF-8?q?del()=20doesn=E2=80=99t=20participate=20in=20pipeline.=20#2680?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 1 + .../data/connection/RedissonConnection.java | 71 +------------------ .../data/connection/RedissonConnection.java | 71 +------------------ .../data/connection/RedissonConnection.java | 71 +------------------ .../data/connection/RedissonConnection.java | 71 +------------------ .../data/connection/RedissonConnection.java | 71 +------------------ .../data/connection/RedissonConnection.java | 71 +------------------ .../RedissonPipelineConnectionTest.java | 18 +++++ 8 files changed, 25 insertions(+), 420 deletions(-) diff --git a/pom.xml b/pom.xml index 55e24584c..5c883fc00 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ redisson-tomcat redisson-spring-data redisson-spring-boot-starter + redisson-spring-cloud-connector redisson-mybatis redisson-hibernate diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 5faa05d1a..5d90b4ea4 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -200,78 +200,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 52658e90b..4777cad79 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -230,78 +230,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 6a59fb7eb..a2afbf13e 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -236,78 +236,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 785c967ca..ba40bc750 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -203,78 +203,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index a476904ae..faa236a4f 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -233,78 +233,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 4c5fa1ee8..fb5e3aedc 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -225,78 +225,9 @@ public class RedissonConnection extends AbstractRedisConnection { return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); } - private void checkExecution(final RPromise result, final AtomicReference failed, - final AtomicLong count, final AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - if (!executorService.getConnectionManager().isClusterMode()) { - return executorService.writeAsync(null, command, Arrays.asList(keys).toArray()); - } - - Map> range2key = new HashMap>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); - } - - final RPromise result = new RedissonPromise(); - final AtomicReference failed = new AtomicReference(); - final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { - @Override - public void accept(BatchResult r, Throwable u) { - if (u == null) { - List result = (List) r.getResponses(); - for (Long res : result) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - } - }; - - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); - } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); - } - - return result; - } - @Override public Long del(byte[]... keys) { - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + return write(keys[0], LongCodec.INSTANCE, RedisCommands.DEL, Arrays.asList(keys).toArray()); } private static final RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); diff --git a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java index 038e9605d..643e7423a 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-22/src/test/java/org/redisson/spring/data/connection/RedissonPipelineConnectionTest.java @@ -9,6 +9,24 @@ import org.redisson.BaseTest; public class RedissonPipelineConnectionTest extends BaseConnectionTest { + @Test + public void testDel() { + RedissonConnection connection = new RedissonConnection(redisson); + byte[] key = "my_key".getBytes(); + byte[] value = "my_value".getBytes(); + connection.set(key, value); + + connection.openPipeline(); + connection.get(key); + connection.del(key); + + List results = connection.closePipeline(); + byte[] val = (byte[])results.get(0); + assertThat(val).isEqualTo(value); + Long res = (Long) results.get(1); + assertThat(res).isEqualTo(1); + } + @Test public void testEcho() { RedissonConnection connection = new RedissonConnection(redisson);