refactoring

pull/5732/head
Nikita Koksharov 11 months ago
parent 469c8f3dd1
commit 8d0ca5f99b

@ -38,7 +38,7 @@ public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
}
@Override
public void onSlotResult(Boolean res) {
public void onSlotResult(List<Object> keys, Boolean res) {
if (res) {
r.set(true);
}

@ -38,7 +38,7 @@ public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
}
@Override
public void onSlotResult(Integer result) {
public void onSlotResult(List<Object> keys, Integer result) {
results.addAndGet(result);
}

@ -38,7 +38,7 @@ public class LongSlotCallback implements SlotCallback<Long, Long> {
}
@Override
public void onSlotResult(Long result) {
public void onSlotResult(List<Object> keys, Long result) {
results.addAndGet(result);
}

@ -87,7 +87,7 @@ public class RedissonBuckets implements RBuckets {
final Map<String, V> results = new ConcurrentHashMap<>();
@Override
public void onSlotResult(Map<Object, Object> result) {
public void onSlotResult(List<Object> keys, Map<Object, Object> result) {
for (Map.Entry<Object, Object> entry : result.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey());
@ -120,7 +120,7 @@ public class RedissonBuckets implements RBuckets {
final AtomicBoolean result = new AtomicBoolean(true);
@Override
public void onSlotResult(Boolean result) {
public void onSlotResult(List<Object> keys, Boolean result) {
if (!result && this.result.get()){
this.result.set(result);
}
@ -163,7 +163,7 @@ public class RedissonBuckets implements RBuckets {
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback<Void, Void>() {
@Override
public void onSlotResult(Void result) {
public void onSlotResult(List<Object> keys, Void result) {
}
@Override

@ -16,6 +16,7 @@
package org.redisson;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry;
import java.util.List;
@ -32,7 +33,7 @@ public interface SlotCallback<T, R> {
return null;
}
default Object[] createKeys(List<Object> params) {
default Object[] createKeys(MasterSlaveEntry entry, List<Object> params) {
return params.toArray();
}
@ -40,7 +41,7 @@ public interface SlotCallback<T, R> {
return params.toArray();
}
void onSlotResult(T result);
void onSlotResult(List<Object> keys, T result);
R onFinish();

@ -38,7 +38,7 @@ public class VoidSlotCallback implements SlotCallback<Void, Void> {
}
@Override
public void onSlotResult(Void res) {
public void onSlotResult(List<Object> keys, Void res) {
}
@Override

@ -651,7 +651,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <T, R> RFuture<R> evalBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
if (!connectionManager.isClusterMode()) {
Object[] keysArray = callback.createKeys(keys);
Object[] keysArray = callback.createKeys(null, keys);
Object[] paramsArray = callback.createParams(Collections.emptyList());
if (readOnly) {
return evalReadAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
@ -663,7 +663,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (keys.isEmpty()) {
entry2keys = connectionManager.getEntrySet().stream()
.collect(Collectors.toMap(Function.identity(),
e -> Collections.singletonMap(0, Collections.emptyList())));
e -> Collections.singletonMap(0, new ArrayList<>())));
} else {
entry2keys = keys.stream().collect(
Collectors.groupingBy(k -> {
@ -687,7 +687,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}, Collectors.toList())));
}
List<CompletableFuture<?>> futures = new ArrayList<>();
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
@ -703,14 +703,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (newCommand != null) {
c = newCommand;
}
Object[] keysArray = callback.createKeys(groupedKeys);
Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys);
Object[] paramsArray = callback.createParams(Collections.emptyList());
if (readOnly) {
RFuture<T> f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
futures.put(groupedKeys, f.toCompletableFuture());
} else {
RFuture<T> f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
futures.put(groupedKeys, f.toCompletableFuture());
}
}
@ -719,10 +719,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<Void> future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
CompletableFuture<R> result = future.thenApply(r -> {
futures.forEach(f -> {
callback.onSlotResult((T) f.join());
futures.entrySet().forEach(e -> {
callback.onSlotResult(e.getKey(), (T) e.getValue().join());
});
return callback.onFinish();
});
@ -760,7 +760,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}, Collectors.toList())));
List<CompletableFuture<?>> futures = new ArrayList<>();
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>();
List<CompletableFuture<?>> mainFutures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
@ -780,10 +780,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Object[] params = callback.createParams(groupedKeys);
if (readOnly) {
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, params);
futures.add(f.toCompletableFuture());
futures.put(groupedKeys, f.toCompletableFuture());
} else {
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, params);
futures.add(f.toCompletableFuture());
futures.put(groupedKeys, f.toCompletableFuture());
}
}
@ -797,12 +797,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!mainFutures.isEmpty()) {
future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0]));
} else {
future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
}
CompletableFuture<R> result = future.thenApply(r -> {
futures.forEach(f -> {
if (!f.isCompletedExceptionally() && f.getNow(null) != null) {
callback.onSlotResult((T) f.getNow(null));
futures.entrySet().forEach(e -> {
if (!e.getValue().isCompletedExceptionally() && e.getValue().getNow(null) != null) {
callback.onSlotResult(e.getKey(), (T) e.getValue().getNow(null));
}
});
return callback.onFinish();

@ -205,7 +205,54 @@ public class RedissonTopicTest extends RedisDockerTest {
redisson.shutdown();
}
@Test
public void test1() throws InterruptedException {
int loops = 10;
AtomicInteger counter = new AtomicInteger();
for (int j = 0; j < loops; j++) {
RTopic t = redisson.getTopic("PUBSUB_" + j);
t.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
// System.out.println("channel " + channel + " " + msg);
counter.incrementAndGet();
// System.out.println("m " + counter.incrementAndGet());
}
});
}
for (int s = 0; s < 100; s++) {
ExecutorService executor = Executors.newFixedThreadPool(16);
for (int k = 0; k < 100; k++) {
executor.execute(() -> {
for (int j = 0; j < loops; j++) {
for (int i = 0; i < 20; i++) {
RTopic t = redisson.getTopic("PUBSUB_" + j);
t.publishAsync("message " + j + "_" + i);
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Awaitility.waitAtMost(Duration.ofSeconds(5)).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(loops * 20*100);
});
counter.set(0);
System.out.println("s " + s);
}
}
@Test
public void testConcurrentTopic() throws Exception {
int threads = 16;

Loading…
Cancel
Save