refactoring

pull/5420/head
Nikita Koksharov 1 year ago
parent 5d6748a321
commit 10cf69fd98

@ -0,0 +1,44 @@
package org.redisson;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params;
public BooleanSlotCallback() {
this(null);
}
public BooleanSlotCallback(Object[] params) {
this.params = params;
}
@Override
public void onSlotResult(Boolean res) {
if (res) {
r.set(true);
}
}
@Override
public Boolean onFinish() {
return r.get();
}
@Override
public Object[] createParams(List<Object> params) {
if (params == null && this.params != null) {
return this.params;
}
return SlotCallback.super.createParams(params);
}
}

@ -0,0 +1,58 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
*
*/
public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
private final AtomicInteger results = new AtomicInteger();
private final Object[] params;
public IntegerSlotCallback() {
this(null);
}
public IntegerSlotCallback(Object[] params) {
this.params = params;
}
@Override
public void onSlotResult(Integer result) {
results.addAndGet(result);
}
@Override
public Integer onFinish() {
return results.get();
}
@Override
public Object[] createParams(List<Object> params) {
if (params == null && this.params != null) {
return this.params;
}
return SlotCallback.super.createParams(params);
}
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -26,6 +27,16 @@ public class LongSlotCallback implements SlotCallback<Long, Long> {
private final AtomicLong results = new AtomicLong();
private final Object[] params;
public LongSlotCallback() {
this(null);
}
public LongSlotCallback(Object[] params) {
this.params = params;
}
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
@ -35,4 +46,13 @@ public class LongSlotCallback implements SlotCallback<Long, Long> {
public Long onFinish() {
return results.get();
}
@Override
public Object[] createParams(List<Object> params) {
if (params == null && this.params != null) {
return this.params;
}
return SlotCallback.super.createParams(params);
}
}

@ -33,6 +33,9 @@ public interface SlotCallback<T, R> {
}
default Object[] createParams(List<Object> params) {
if (params == null) {
return new Object[]{};
}
return params.toArray();
}

@ -0,0 +1,41 @@
package org.redisson;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
*
*/
public class VoidSlotCallback implements SlotCallback<Void, Void> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params;
public VoidSlotCallback() {
this(null);
}
public VoidSlotCallback(Object[] params) {
this.params = params;
}
@Override
public void onSlotResult(Void res) {
}
@Override
public Void onFinish() {
return null;
}
@Override
public Object[] createParams(List<Object> params) {
if (this.params != null) {
return this.params;
}
return SlotCallback.super.createParams(params);
}
}

@ -46,6 +46,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -595,26 +596,33 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalWriteAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
}
Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys = keys.stream().collect(
Collectors.groupingBy(k -> {
int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList())));
Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys;
if (keys.isEmpty()) {
entry2keys = connectionManager.getEntrySet().stream()
.collect(Collectors.toMap(Function.identity(),
e -> Collections.singletonMap(0, Collections.emptyList())));
} else {
entry2keys = keys.stream().collect(
Collectors.groupingBy(k -> {
int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList())));
}
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {

Loading…
Cancel
Save