refactoring

pull/6047/head
Nikita Koksharov 6 months ago
parent 3871d55fee
commit f86744d0ff

@ -15,8 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> { public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params; private final Object[] params;
public BooleanSlotCallback() { public BooleanSlotCallback() {
@ -38,15 +36,8 @@ public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
} }
@Override @Override
public void onSlotResult(List<Object> keys, Boolean res) { public Boolean onResult(Collection<Boolean> res) {
if (res) { return res.contains(true);
r.set(true);
}
}
@Override
public Boolean onFinish() {
return r.get();
} }
@Override @Override

@ -15,8 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class IntegerSlotCallback implements SlotCallback<Integer, Integer> { public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
private final AtomicInteger results = new AtomicInteger();
private final Object[] params; private final Object[] params;
public IntegerSlotCallback() { public IntegerSlotCallback() {
@ -38,13 +36,8 @@ public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
} }
@Override @Override
public void onSlotResult(List<Object> keys, Integer result) { public Integer onResult(Collection<Integer> result) {
results.addAndGet(result); return result.stream().mapToInt(r -> r).sum();
}
@Override
public Integer onFinish() {
return results.get();
} }
@Override @Override

@ -15,8 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* *
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class LongSlotCallback implements SlotCallback<Long, Long> { public class LongSlotCallback implements SlotCallback<Long, Long> {
private final AtomicLong results = new AtomicLong();
private final Object[] params; private final Object[] params;
public LongSlotCallback() { public LongSlotCallback() {
@ -38,13 +36,8 @@ public class LongSlotCallback implements SlotCallback<Long, Long> {
} }
@Override @Override
public void onSlotResult(List<Object> keys, Long result) { public Long onResult(Collection<Long> result) {
results.addAndGet(result); return result.stream().mapToLong(r -> r).sum();
}
@Override
public Long onFinish() {
return results.get();
} }
@Override @Override

@ -29,8 +29,6 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -82,23 +80,18 @@ public class RedissonBuckets implements RBuckets {
Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec); Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(keysList, 0)); RedisCommand<Map<Object, Object>> command = new RedisCommand<>("MGET", new MapGetAllDecoder(keysList, 0));
return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() { return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() {
final Map<String, V> results = new ConcurrentHashMap<>();
@Override @Override
public void onSlotResult(List<Object> keys, Map<Object, Object> result) { public Map<String, V> onResult(Collection<Map<Object, Object>> result) {
for (Map.Entry<Object, Object> entry : result.entrySet()) { return result.stream()
if (entry.getKey() != null && entry.getValue() != null) { .flatMap(c -> c.entrySet().stream())
String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey()); .filter(e -> e.getKey() != null && e.getValue() != null)
results.put(key, (V) entry.getValue()); .map(e -> {
} String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) e.getKey());
} return new AbstractMap.SimpleEntry<>(key, (V) e.getValue());
} }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
@Override
public Map<String, V> onFinish() {
return results;
} }
@Override @Override
@ -116,21 +109,7 @@ public class RedissonBuckets implements RBuckets {
Map<String, ?> mappedBuckets = map(buckets); Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new SlotCallback<Boolean, Boolean>() { return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new BooleanSlotCallback() {
final AtomicBoolean result = new AtomicBoolean(true);
@Override
public void onSlotResult(List<Object> keys, Boolean result) {
if (!result && this.result.get()){
this.result.set(result);
}
}
@Override
public Boolean onFinish() {
return this.result.get();
}
@Override @Override
public Object[] createParams(List<Object> keys) { public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size()); List<Object> params = new ArrayList<>(keys.size());
@ -161,16 +140,7 @@ public class RedissonBuckets implements RBuckets {
Map<String, ?> mappedBuckets = map(buckets); Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback<Void, Void>() { return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new VoidSlotCallback() {
@Override
public void onSlotResult(List<Object> keys, Void result) {
}
@Override
public Void onFinish() {
return null;
}
@Override @Override
public Object[] createParams(List<Object> keys) { public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size()); List<Object> params = new ArrayList<>(keys.size());

@ -18,6 +18,7 @@ package org.redisson;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -41,8 +42,6 @@ public interface SlotCallback<T, R> {
return params.toArray(); return params.toArray();
} }
void onSlotResult(List<Object> keys, T result); R onResult(Collection<T> result);
R onFinish();
} }

@ -15,8 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class VoidSlotCallback implements SlotCallback<Void, Void> { public class VoidSlotCallback implements SlotCallback<Void, Void> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params; private final Object[] params;
public VoidSlotCallback() { public VoidSlotCallback() {
@ -38,11 +36,7 @@ public class VoidSlotCallback implements SlotCallback<Void, Void> {
} }
@Override @Override
public void onSlotResult(List<Object> keys, Void res) { public Void onResult(Collection<Void> result) {
}
@Override
public Void onFinish() {
return null; return null;
} }

@ -729,7 +729,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}, Collectors.toList()))); }, Collectors.toList())));
} }
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>(); List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) { for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error // executes in batch due to CROSSLOT error
CommandBatchService executorService; CommandBatchService executorService;
@ -747,13 +747,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys); Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys);
Object[] paramsArray = callback.createParams(Collections.emptyList()); Object[] paramsArray = callback.createParams(Collections.emptyList());
RFuture<T> f;
if (readOnly) { if (readOnly) {
RFuture<T> f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.put(groupedKeys, f.toCompletableFuture());
} else { } else {
RFuture<T> f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.put(groupedKeys, f.toCompletableFuture());
} }
futures.add(f.toCompletableFuture());
} }
if (!(this instanceof CommandBatchService)) { if (!(this instanceof CommandBatchService)) {
@ -761,12 +761,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
} }
CompletableFuture<Void> future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> result = future.thenApply(r -> { CompletableFuture<R> result = future.thenApply(r -> {
futures.entrySet().forEach(e -> { List<T> res = futures.stream()
callback.onSlotResult(e.getKey(), (T) e.getValue().join()); .map(e -> (T) e.join())
}); .collect(Collectors.toList());
return callback.onFinish(); return callback.onResult(res);
}); });
return new CompletableFutureWrapper<>(result); return new CompletableFutureWrapper<>(result);
@ -802,7 +802,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
}, Collectors.toList()))); }, Collectors.toList())));
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>(); List<CompletableFuture<?>> futures = new ArrayList<>();
List<CompletableFuture<?>> mainFutures = new ArrayList<>(); List<CompletableFuture<?>> mainFutures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) { for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error // executes in batch due to CROSSLOT error
@ -820,13 +820,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
c = newCommand; c = newCommand;
} }
Object[] params = callback.createParams(groupedKeys); Object[] params = callback.createParams(groupedKeys);
RFuture<T> f;
if (readOnly) { if (readOnly) {
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, params); f = executorService.readAsync(entry.getKey(), codec, c, params);
futures.put(groupedKeys, f.toCompletableFuture());
} else { } else {
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, params); f = executorService.writeAsync(entry.getKey(), codec, c, params);
futures.put(groupedKeys, f.toCompletableFuture());
} }
futures.add(f.toCompletableFuture());
} }
if (!(this instanceof CommandBatchService)) { if (!(this instanceof CommandBatchService)) {
@ -839,15 +839,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!mainFutures.isEmpty()) { if (!mainFutures.isEmpty()) {
future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0])); future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0]));
} else { } else {
future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
} }
CompletableFuture<R> result = future.thenApply(r -> { CompletableFuture<R> result = future.thenApply(r -> {
futures.entrySet().forEach(e -> { List<T> res = futures.stream()
if (!e.getValue().isCompletedExceptionally() && e.getValue().getNow(null) != null) { .filter(e -> !e.isCompletedExceptionally() && e.getNow(null) != null)
callback.onSlotResult(e.getKey(), (T) e.getValue().getNow(null)); .map(e -> (T) e.join())
} .collect(Collectors.toList());
});
return callback.onFinish(); return callback.onResult(res);
}); });
return new CompletableFutureWrapper<>(result); return new CompletableFutureWrapper<>(result);

Loading…
Cancel
Save