diff --git a/redisson/src/main/java/org/redisson/BooleanSlotCallback.java b/redisson/src/main/java/org/redisson/BooleanSlotCallback.java index 1f5feb52f..97ffd623e 100644 --- a/redisson/src/main/java/org/redisson/BooleanSlotCallback.java +++ b/redisson/src/main/java/org/redisson/BooleanSlotCallback.java @@ -38,7 +38,7 @@ public class BooleanSlotCallback implements SlotCallback { } @Override - public void onSlotResult(Boolean res) { + public void onSlotResult(List keys, Boolean res) { if (res) { r.set(true); } diff --git a/redisson/src/main/java/org/redisson/IntegerSlotCallback.java b/redisson/src/main/java/org/redisson/IntegerSlotCallback.java index e13a72125..1690c5e22 100644 --- a/redisson/src/main/java/org/redisson/IntegerSlotCallback.java +++ b/redisson/src/main/java/org/redisson/IntegerSlotCallback.java @@ -38,7 +38,7 @@ public class IntegerSlotCallback implements SlotCallback { } @Override - public void onSlotResult(Integer result) { + public void onSlotResult(List keys, Integer result) { results.addAndGet(result); } diff --git a/redisson/src/main/java/org/redisson/LongSlotCallback.java b/redisson/src/main/java/org/redisson/LongSlotCallback.java index 01dd059ff..2d2f2295e 100644 --- a/redisson/src/main/java/org/redisson/LongSlotCallback.java +++ b/redisson/src/main/java/org/redisson/LongSlotCallback.java @@ -38,7 +38,7 @@ public class LongSlotCallback implements SlotCallback { } @Override - public void onSlotResult(Long result) { + public void onSlotResult(List keys, Long result) { results.addAndGet(result); } diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java index f6411433d..e0a9b8d02 100644 --- a/redisson/src/main/java/org/redisson/RedissonBuckets.java +++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java @@ -87,7 +87,7 @@ public class RedissonBuckets implements RBuckets { final Map results = new ConcurrentHashMap<>(); @Override - public void onSlotResult(Map result) { + public void onSlotResult(List keys, Map result) { for (Map.Entry entry : result.entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey()); @@ -120,7 +120,7 @@ public class RedissonBuckets implements RBuckets { final AtomicBoolean result = new AtomicBoolean(true); @Override - public void onSlotResult(Boolean result) { + public void onSlotResult(List keys, Boolean result) { if (!result && this.result.get()){ this.result.set(result); } @@ -163,7 +163,7 @@ public class RedissonBuckets implements RBuckets { return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback() { @Override - public void onSlotResult(Void result) { + public void onSlotResult(List keys, Void result) { } @Override diff --git a/redisson/src/main/java/org/redisson/SlotCallback.java b/redisson/src/main/java/org/redisson/SlotCallback.java index 8298c9952..041127305 100644 --- a/redisson/src/main/java/org/redisson/SlotCallback.java +++ b/redisson/src/main/java/org/redisson/SlotCallback.java @@ -16,6 +16,7 @@ package org.redisson; import org.redisson.client.protocol.RedisCommand; +import org.redisson.connection.MasterSlaveEntry; import java.util.List; @@ -32,7 +33,7 @@ public interface SlotCallback { return null; } - default Object[] createKeys(List params) { + default Object[] createKeys(MasterSlaveEntry entry, List params) { return params.toArray(); } @@ -40,7 +41,7 @@ public interface SlotCallback { return params.toArray(); } - void onSlotResult(T result); + void onSlotResult(List keys, T result); R onFinish(); diff --git a/redisson/src/main/java/org/redisson/VoidSlotCallback.java b/redisson/src/main/java/org/redisson/VoidSlotCallback.java index 6f8c4bd79..c9dc9a753 100644 --- a/redisson/src/main/java/org/redisson/VoidSlotCallback.java +++ b/redisson/src/main/java/org/redisson/VoidSlotCallback.java @@ -38,7 +38,7 @@ public class VoidSlotCallback implements SlotCallback { } @Override - public void onSlotResult(Void res) { + public void onSlotResult(List keys, Void res) { } @Override diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 6a618ff61..cb3e5eb6e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -651,7 +651,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { private RFuture evalBatchedAsync(boolean readOnly, Codec codec, RedisCommand command, String script, List keys, SlotCallback callback) { if (!connectionManager.isClusterMode()) { - Object[] keysArray = callback.createKeys(keys); + Object[] keysArray = callback.createKeys(null, keys); Object[] paramsArray = callback.createParams(Collections.emptyList()); if (readOnly) { return evalReadAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray); @@ -663,7 +663,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (keys.isEmpty()) { entry2keys = connectionManager.getEntrySet().stream() .collect(Collectors.toMap(Function.identity(), - e -> Collections.singletonMap(0, Collections.emptyList()))); + e -> Collections.singletonMap(0, new ArrayList<>()))); } else { entry2keys = keys.stream().collect( Collectors.groupingBy(k -> { @@ -687,7 +687,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }, Collectors.toList()))); } - List> futures = new ArrayList<>(); + Map, CompletableFuture> futures = new IdentityHashMap<>(); for (Entry>> entry : entry2keys.entrySet()) { // executes in batch due to CROSSLOT error CommandBatchService executorService; @@ -703,14 +703,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (newCommand != null) { c = newCommand; } - Object[] keysArray = callback.createKeys(groupedKeys); + Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys); Object[] paramsArray = callback.createParams(Collections.emptyList()); if (readOnly) { RFuture f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); - futures.add(f.toCompletableFuture()); + futures.put(groupedKeys, f.toCompletableFuture()); } else { RFuture f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); - futures.add(f.toCompletableFuture()); + futures.put(groupedKeys, f.toCompletableFuture()); } } @@ -719,10 +719,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); CompletableFuture result = future.thenApply(r -> { - futures.forEach(f -> { - callback.onSlotResult((T) f.join()); + futures.entrySet().forEach(e -> { + callback.onSlotResult(e.getKey(), (T) e.getValue().join()); }); return callback.onFinish(); }); @@ -760,7 +760,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }, Collectors.toList()))); - List> futures = new ArrayList<>(); + Map, CompletableFuture> futures = new IdentityHashMap<>(); List> mainFutures = new ArrayList<>(); for (Entry>> entry : entry2keys.entrySet()) { // executes in batch due to CROSSLOT error @@ -780,10 +780,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { Object[] params = callback.createParams(groupedKeys); if (readOnly) { RFuture f = executorService.readAsync(entry.getKey(), codec, c, params); - futures.add(f.toCompletableFuture()); + futures.put(groupedKeys, f.toCompletableFuture()); } else { RFuture f = executorService.writeAsync(entry.getKey(), codec, c, params); - futures.add(f.toCompletableFuture()); + futures.put(groupedKeys, f.toCompletableFuture()); } } @@ -797,12 +797,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (!mainFutures.isEmpty()) { future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0])); } else { - future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); } CompletableFuture result = future.thenApply(r -> { - futures.forEach(f -> { - if (!f.isCompletedExceptionally() && f.getNow(null) != null) { - callback.onSlotResult((T) f.getNow(null)); + futures.entrySet().forEach(e -> { + if (!e.getValue().isCompletedExceptionally() && e.getValue().getNow(null) != null) { + callback.onSlotResult(e.getKey(), (T) e.getValue().getNow(null)); } }); return callback.onFinish(); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index f9ccc55d3..db0315577 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -205,7 +205,54 @@ public class RedissonTopicTest extends RedisDockerTest { redisson.shutdown(); } - + + @Test + public void test1() throws InterruptedException { + int loops = 10; + AtomicInteger counter = new AtomicInteger(); + for (int j = 0; j < loops; j++) { + RTopic t = redisson.getTopic("PUBSUB_" + j); + t.addListener(String.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, String msg) { +// System.out.println("channel " + channel + " " + msg); + counter.incrementAndGet(); +// System.out.println("m " + counter.incrementAndGet()); + } + }); + } + + for (int s = 0; s < 100; s++) { + ExecutorService executor = Executors.newFixedThreadPool(16); + for (int k = 0; k < 100; k++) { + executor.execute(() -> { + for (int j = 0; j < loops; j++) { + for (int i = 0; i < 20; i++) { + RTopic t = redisson.getTopic("PUBSUB_" + j); + t.publishAsync("message " + j + "_" + i); + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + + Awaitility.waitAtMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(counter.get()).isEqualTo(loops * 20*100); + }); + counter.set(0); + System.out.println("s " + s); + } + + + } + @Test public void testConcurrentTopic() throws Exception { int threads = 16;