refactoring

pull/5038/head
Nikita Koksharov 2 years ago
parent 5f000af7a6
commit 2bac289bc2

@ -49,7 +49,7 @@ public interface CommandAsyncExecutor {
RedisException convertException(ExecutionException e);
<V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2);
<V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2);
<V> V getNow(CompletableFuture<V> future);

@ -89,7 +89,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2) {
public <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2) {
future1.whenComplete((res, e) -> {
if (e != null) {
future2.completeExceptionally(e);

@ -24,10 +24,12 @@ import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -46,17 +48,14 @@ public class CommandReactiveBatchService extends CommandReactiveService {
@Override
public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
Mono<R> mono = super.reactive(new Callable<RFuture<R>>() {
volatile RFuture<R> future;
final CompletableFuture<R> future = new CompletableFuture<>();
final AtomicBoolean lock = new AtomicBoolean();
@Override
public RFuture<R> call() throws Exception {
if (future == null) {
synchronized (this) {
if (future == null) {
future = supplier.call();
}
}
if (lock.compareAndSet(false, true)) {
transfer(supplier.call().toCompletableFuture(), future);
}
return future;
return new CompletableFutureWrapper<>(future);
}
});
mono.subscribe();

@ -26,9 +26,11 @@ import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -47,17 +49,14 @@ public class CommandRxBatchService extends CommandRxService {
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
Flowable<R> flowable = super.flowable(new Callable<RFuture<R>>() {
volatile RFuture<R> future;
final CompletableFuture<R> future = new CompletableFuture<>();
final AtomicBoolean lock = new AtomicBoolean();
@Override
public RFuture<R> call() throws Exception {
if (future == null) {
synchronized (this) {
if (future == null) {
future = supplier.call();
}
}
if (lock.compareAndSet(false, true)) {
transfer(supplier.call().toCompletableFuture(), future);
}
return future;
return new CompletableFutureWrapper<>(future);
}
});
flowable.subscribe();

@ -99,7 +99,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
@ParameterizedTest
@MethodSource("data")
public void testPerformance(BatchOptions batchOptions) {
Assertions.assertTimeout(Duration.ofSeconds(20), () -> {
Assertions.assertTimeout(Duration.ofSeconds(21), () -> {
RMapRx<String, String> map = redisson.getMap("map");
Map<String, String> m = new HashMap<String, String>();
for (int j = 0; j < 1000; j++) {

Loading…
Cancel
Save