From 7c6054e939f7f87ee6394d02acf28ff3ffd50790 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 3 May 2021 10:44:57 +0300 Subject: [PATCH] Fixed - mGet() and mSet() methods of RedissonConnection object in Spring Data module throw CROSSSLOT error. #3582 --- .../connection/RedissonClusterConnection.java | 114 ++++++------------ .../RedissonClusterConnectionTest.java | 27 ++++- .../connection/RedissonClusterConnection.java | 114 ++++++------------ .../connection/RedissonClusterConnection.java | 113 ++++++----------- .../connection/RedissonClusterConnection.java | 113 ++++++----------- .../connection/RedissonClusterConnection.java | 113 ++++++----------- .../connection/RedissonClusterConnection.java | 113 ++++++----------- .../connection/RedissonClusterConnection.java | 113 ++++++----------- .../RedissonClusterConnectionTest.java | 49 +++++--- 9 files changed, 338 insertions(+), 531 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 841d9adda..93fce4b30 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,10 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -46,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; @@ -57,7 +39,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -449,75 +434,54 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public void mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); } - } diff --git a/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index 873ae8c9a..d1403541e 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-17/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -186,9 +186,32 @@ public class RedissonClusterConnectionTest { keys.add(key); connection.set(key, ("test" + i).getBytes()); } - connection.del(keys.toArray(new byte[0][])); + assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10); } - + + @Test + public void testMSet() { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + for (Map.Entry entry : map.entrySet()) { + assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue()); + } + } + + @Test + public void testMGet() { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + List r = connection.mGet(map.keySet().toArray(new byte[0][])); + assertThat(r).containsExactly(map.values().toArray(new byte[0][])); + } + @Test public void testResetConfigStats() { RedisClusterNode master = getFirstMaster(); diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 5e4bba879..fc8e4212d 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,10 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -46,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; @@ -57,7 +39,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -448,75 +433,54 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public void mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); } - } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index cb8dac905..a5aa5e9a3 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,25 +15,10 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -45,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterConnection; @@ -57,7 +40,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -453,75 +439,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Red return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public Boolean mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return true; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); + return true; } } diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1d6e894c3..279a72c5c 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,11 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; @@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public Boolean mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return true; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); + return true; } } diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1d6e894c3..279a72c5c 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,11 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; @@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public Boolean mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return true; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); + return true; } } diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1d6e894c3..279a72c5c 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,11 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; @@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public Boolean mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return true; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); + return true; } } diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java index 1d6e894c3..279a72c5c 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -15,26 +15,11 @@ */ package org.redisson.spring.data.connection; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - +import io.netty.util.CharsetUtil; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; -import org.redisson.client.RedisException; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.DefaultedRedisClusterConnection; @@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import io.netty.util.CharsetUtil; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * @@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def return false; } - private void checkExecution(RPromise result, AtomicReference failed, - AtomicLong count, AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); + @Override + public Long del(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key: keys) { + write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); } - } - } - private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { - Map> range2key = new HashMap<>(); - for (byte[] key : keys) { - int slot = executorService.getConnectionManager().calcSlot(key); - MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); - List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); - list.add(key); + return null; } - RPromise result = new RedissonPromise<>(); - AtomicReference failed = new AtomicReference<>(); - AtomicLong count = new AtomicLong(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (r, u) -> { - if (u == null) { - List result1 = (List) r.getResponses(); - for (Long res : result1) { - if (res != null) { - count.addAndGet(res); - } - } - } else { - failed.set(u); - } - - checkExecution(result, failed, count, executed); - }; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key); + } + BatchResult b = (BatchResult) es.execute(); + return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum(); + } - for (Entry> entry : range2key.entrySet()) { - CommandBatchService es = new CommandBatchService(executorService); - for (byte[] key : entry.getValue()) { - es.writeAsync(entry.getKey(), null, command, key); + @Override + public List mGet(byte[]... keys) { + if (isQueueing() || isPipelined()) { + for (byte[] key : keys) { + read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); } - - RFuture> future = es.executeAsync(); - future.onComplete(listener); + return null; } - return result; + CommandBatchService es = new CommandBatchService(executorService); + for (byte[] key: keys) { + es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + BatchResult r = (BatchResult) es.execute(); + return r.getResponses(); } @Override - public Long del(byte[]... keys) { + public Boolean mSet(Map tuple) { if (isQueueing() || isPipelined()) { - for (byte[] key: keys) { - write(key, LongCodec.INSTANCE, RedisCommands.DEL, key); + for (Entry entry: tuple.entrySet()) { + write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); } - - return null; + return true; } - RFuture f = executeAsync(RedisCommands.DEL, keys); - return sync(f); + CommandBatchService es = new CommandBatchService(executorService); + for (Entry entry: tuple.entrySet()) { + es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue()); + } + es.execute(); + return true; } } diff --git a/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java b/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java index cfdff0df0..7d389f86e 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java +++ b/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonClusterConnectionTest.java @@ -13,7 +13,6 @@ import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; -import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -23,8 +22,7 @@ import org.springframework.data.redis.core.types.RedisClientInfo; import java.io.IOException; import java.util.*; -import static org.assertj.core.api.Assertions.*; -import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT; +import static org.assertj.core.api.Assertions.assertThat; public class RedissonClusterConnectionTest { @@ -64,6 +62,40 @@ public class RedissonClusterConnectionTest { redisson.shutdown(); } + @Test + public void testDel() { + List keys = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + byte[] key = ("test" + i).getBytes(); + keys.add(key); + connection.set(key, ("test" + i).getBytes()); + } + assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10); + } + + @Test + public void testMSet() { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + for (Map.Entry entry : map.entrySet()) { + assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue()); + } + } + + @Test + public void testMGet() { + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put(("test" + i).getBytes(), ("test" + i*100).getBytes()); + } + connection.mSet(map); + List r = connection.mGet(map.keySet().toArray(new byte[0][])); + assertThat(r).containsExactly(map.values().toArray(new byte[0][])); + } + @Test public void testClusterGetNodes() { Iterable nodes = connection.clusterGetNodes(); @@ -194,17 +226,6 @@ public class RedissonClusterConnectionTest { assertThat(res).isEqualTo(1); } - @Test - public void testDel() { - List keys = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - byte[] key = ("test" + i).getBytes(); - keys.add(key); - connection.set(key, ("test" + i).getBytes()); - } - connection.del(keys.toArray(new byte[0][])); - } - @Test public void testResetConfigStats() { RedisClusterNode master = getFirstMaster();