refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent d8322b246e
commit 94265fec64

@ -23,6 +23,7 @@ import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -34,6 +35,7 @@ import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
/**
@ -261,21 +263,14 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override
public Future<Integer> write(ByteBuffer src) {
RPromise<Integer> result = new RedissonPromise<>();
ByteBuf b = Unpooled.wrappedBuffer(src);
RFuture<Long> res = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SETRANGE, getRawName(), position, b);
res.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Integer> f = res.thenApply(r -> {
position += b.readableBytes();
result.trySuccess(b.readableBytes());
return b.readableBytes();
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override

@ -27,10 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.*;
import org.redisson.executor.params.*;
import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.*;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result;
@ -338,7 +335,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
public void execute(Runnable task) {
check(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncServiceWithoutResult.executeRunnable(
createTaskParameters(task));
createTaskParameters(task)).toCompletableFuture();
syncExecute(promise);
}
@ -572,14 +569,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submit(Callable<T> task) {
RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task)).getInnerPromise();
RemotePromise<T> promise = (RemotePromise<T>) submitAsync(task).toCompletableFuture();
syncExecute(promise);
return createFuture(promise);
}
@Override
public <T> RExecutorFuture<T> submit(Callable<T> task, long timeToLive, TimeUnit timeUnit) {
RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task, timeToLive, timeUnit)).getInnerPromise();
RemotePromise<T> promise = (RemotePromise<T>) submitAsync(task, timeToLive, timeUnit).toCompletableFuture();
syncExecute(promise);
return createFuture(promise);
}
@ -597,7 +594,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(createTaskParameters(task));
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(createTaskParameters(task)).toCompletableFuture();
addListener(result);
return createFuture(result);
}
@ -613,7 +610,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) {
check(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task)).toCompletableFuture();
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -649,7 +646,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Callable<?> task : tasks) {
check(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task)).toCompletableFuture();
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -680,12 +677,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private <T> void addListener(RemotePromise<T> result) {
result.getAddFuture().whenComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.toCompletableFuture().completeExceptionally(e);
return;
}
if (!res) {
result.tryFailure(new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"));
result.toCompletableFuture().completeExceptionally(new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"));
}
});
}
@ -713,16 +710,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submit(Runnable task, T result) {
RPromise<T> resultFuture = new RedissonPromise<T>();
RemotePromise<T> future = (RemotePromise<T>) ((PromiseDelegator<T>) submit(task)).getInnerPromise();
future.onComplete((res, e) -> {
if (e != null) {
resultFuture.tryFailure(e);
return;
}
resultFuture.trySuccess(result);
});
return new RedissonExecutorFuture<T>(resultFuture, future.getRequestId());
RemotePromise<T> future = (RemotePromise<T>) submit(task).toCompletableFuture();
CompletableFuture<T> f = future.thenApply(res -> result);
return new RedissonExecutorFuture<T>(f, future.getRequestId());
}
@Override
@ -736,7 +726,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) {
check(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task)).toCompletableFuture();
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -760,7 +750,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Runnable task : tasks) {
check(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task)).toCompletableFuture();
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -790,14 +780,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RExecutorFuture<?> submit(Runnable task) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task)).getInnerPromise();
RemotePromise<Void> promise = (RemotePromise<Void>) submitAsync(task).toCompletableFuture();
syncExecute(promise);
return createFuture(promise);
}
@Override
public RExecutorFuture<?> submit(Runnable task, long timeToLive, TimeUnit timeUnit) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task, timeToLive, timeUnit)).getInnerPromise();
RemotePromise<Void> promise = (RemotePromise<Void>) submitAsync(task, timeToLive, timeUnit).toCompletableFuture();
syncExecute(promise);
return createFuture(promise);
}
@ -807,7 +797,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
TaskParameters taskParameters = createTaskParameters(task);
taskParameters.setTtl(timeUnit.toMillis(timeToLive));
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(taskParameters);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(taskParameters).toCompletableFuture();
addListener(result);
return createFuture(result);
}
@ -815,7 +805,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RExecutorFuture<?> submitAsync(Runnable task) {
check(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task)).toCompletableFuture();
addListener(result);
return createFuture(result);
}
@ -842,7 +832,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -867,12 +857,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
references.remove(r);
if (!r.getPromise().hasListeners()) {
if (r.getPromise().getNumberOfDependents() == 0) {
cancelResponseHandling(r.getRequestId());
}
}
RPromise<?> promise = ((PromiseDelegator<?>) future).getInnerPromise();
CompletableFuture<?> promise = ((CompletableFutureWrapper<?>) future).toCompletableFuture();
RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue, promise);
references.add(reference);
}
@ -885,7 +875,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -898,7 +888,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit, long ttl, TimeUnit ttlUnit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(command, delay, unit, ttl, ttlUnit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -913,7 +903,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(params);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(params).toCompletableFuture();
addListener(result);
return createFuture(result, startTime);
}
@ -921,7 +911,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <V> RScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(callable, delay, unit, timeToLive, ttlUnit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -936,7 +926,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(params);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(params).toCompletableFuture();
addListener(result);
return createFuture(result, startTime);
}
@ -944,7 +934,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -963,7 +953,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
params.setStartTime(startTime);
params.setPeriod(unit.toMillis(period));
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(params);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(params).toCompletableFuture();
addListener(result);
return createFuture(result, startTime);
}
@ -971,7 +961,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -997,7 +987,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
params.setCronExpression(cronSchedule.getExpression().getExpr());
params.setTimezone(ZoneId.systemDefault().toString());
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(params);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(params).toCompletableFuture();
addListener(result);
RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) {
@ -1011,7 +1001,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
RemotePromise<?> rp = future.getInnerPromise();
syncExecute(rp);
return future;
}
@ -1031,7 +1021,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
params.setStartTime(startTime);
params.setDelay(unit.toMillis(delay));
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(params);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(params).toCompletableFuture();
addListener(result);
return createFuture(result, startTime);
}

@ -115,17 +115,17 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public boolean tryTransfer(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
boolean added = commandExecutor.get(future.getAddFuture());
if (added && !future.cancel(false)) {
get(future);
commandExecutor.get(future);
return true;
}
return false;
}
public RFuture<Boolean> tryTransferAsync(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
CompletableFuture<Boolean> result = future.getAddFuture().thenCompose(added -> {
if (!added) {
return CompletableFuture.completedFuture(false);
@ -156,7 +156,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public boolean tryTransfer(V v, long timeout, TimeUnit unit) throws InterruptedException {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
long remainTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
CompletableFuture<Boolean> addFuture = future.getAddFuture().toCompletableFuture();
@ -190,7 +190,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
long remainTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
@ -202,7 +202,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
}, remainTime, TimeUnit.MILLISECONDS);
future.onComplete((res, exc) -> {
future.whenComplete((res, exc) -> {
if (future.isCancelled()) {
result.trySuccess(false);
return;
@ -274,12 +274,12 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public boolean add(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
return commandExecutor.get(future.getAddFuture());
}
public RFuture<Boolean> addAsync(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v).toCompletableFuture();
return new CompletableFutureWrapper<>(future.getAddFuture());
}
@ -654,7 +654,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public RFuture<Void> putAsync(V value) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(value);
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(value).toCompletableFuture();
CompletableFuture<Void> f = future.getAddFuture().thenApply(r -> null);
return new CompletableFutureWrapper<>(f);
}

@ -59,6 +59,8 @@ public interface CommandAsyncExecutor {
<V> V getInterrupted(RFuture<V> future) throws InterruptedException;
<V> V getInterrupted(CompletableFuture<V> future) throws InterruptedException;
<T, R> RFuture<R> writeAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);

@ -180,6 +180,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
@Override
public <V> V getInterrupted(CompletableFuture<V> future) throws InterruptedException {
try {
return future.get();
} catch (InterruptedException e) {
future.completeExceptionally(e);
throw e;
} catch (ExecutionException e) {
throw convertException(e);
}
}
protected <R> CompletableFuture<R> createPromise() {
return new CompletableFuture<R>();
}

@ -16,17 +16,18 @@
package org.redisson.executor;
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.RequestId;
import java.util.concurrent.CompletableFuture;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RExecutorFuture<V> {
public class RedissonExecutorFuture<V> extends CompletableFutureWrapper<V> implements RExecutorFuture<V> {
private final RequestId taskId;
@ -34,7 +35,7 @@ public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RE
this(promise, promise.getRequestId());
}
public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) {
public RedissonExecutorFuture(CompletableFuture<V> promise, RequestId taskId) {
super(promise);
this.taskId = taskId;
}

@ -15,13 +15,13 @@
*/
package org.redisson.executor;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.RPromise;
import org.redisson.remote.RequestId;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.CompletableFuture;
/**
*
* @author Nikita Koksharov
@ -29,16 +29,16 @@ import org.redisson.remote.RequestId;
*/
public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> {
private final RPromise<?> promise;
private final CompletableFuture<?> promise;
private final RequestId requestId;
public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q, RPromise<?> promise) {
public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q, CompletableFuture<?> promise) {
super(referent, q);
this.requestId = requestId;
this.promise = promise;
}
public RPromise<?> getPromise() {
public CompletableFuture<?> getPromise() {
return promise;
}

@ -15,28 +15,34 @@
*/
package org.redisson.executor;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RScheduledFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.RequestId;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonScheduledFuture<V> extends PromiseDelegator<V> implements RScheduledFuture<V> {
public class RedissonScheduledFuture<V> extends CompletableFutureWrapper<V> implements RScheduledFuture<V> {
private final long scheduledExecutionTime;
private final RequestId taskId;
private final RemotePromise<V> promise;
public RedissonScheduledFuture(RemotePromise<V> promise, long scheduledExecutionTime) {
super(promise);
this.scheduledExecutionTime = scheduledExecutionTime;
this.taskId = promise.getRequestId();
this.promise = promise;
}
public RemotePromise<V> getInnerPromise() {
return promise;
}
@Override

@ -15,7 +15,6 @@
*/
package org.redisson.executor;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
import java.util.concurrent.CompletableFuture;
@ -25,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
* @author Nikita Koksharov
*
*/
public class RemotePromise<T> extends RedissonPromise<T> {
public class RemotePromise<T> extends CompletableFuture<T> {
private final RequestId requestId;
private CompletableFuture<Boolean> addFuture;

@ -1,299 +0,0 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class PromiseDelegator<T> implements RPromise<T> {
private final RPromise<T> promise;
public PromiseDelegator(RPromise<T> promise) {
super();
this.promise = promise;
}
public RPromise<T> getInnerPromise() {
return promise;
}
public T join() {
return promise.join();
}
public boolean isSuccess() {
return promise.isSuccess();
}
public boolean trySuccess(T result) {
return promise.trySuccess(result);
}
public Throwable cause() {
return promise.cause();
}
public T getNow() {
return promise.getNow();
}
public boolean tryFailure(Throwable cause) {
return promise.tryFailure(cause);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return promise.await(timeout, unit);
}
public boolean setUncancellable() {
return promise.setUncancellable();
}
public boolean await(long timeoutMillis) throws InterruptedException {
return promise.await(timeoutMillis);
}
public RPromise<T> await() throws InterruptedException {
return promise.await();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return promise.cancel(mayInterruptIfRunning);
}
public RPromise<T> awaitUninterruptibly() {
return promise.awaitUninterruptibly();
}
public RPromise<T> sync() throws InterruptedException {
return promise.sync();
}
public RPromise<T> syncUninterruptibly() {
return promise.syncUninterruptibly();
}
public boolean isCancelled() {
return promise.isCancelled();
}
public boolean isDone() {
return promise.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return promise.get();
}
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return promise.awaitUninterruptibly(timeout, unit);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return promise.get(timeout, unit);
}
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn) {
return promise.thenApply(fn);
}
public boolean awaitUninterruptibly(long timeoutMillis) {
return promise.awaitUninterruptibly(timeoutMillis);
}
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
return promise.thenApplyAsync(fn);
}
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
return promise.thenApplyAsync(fn, executor);
}
public CompletionStage<Void> thenAccept(Consumer<? super T> action) {
return promise.thenAccept(action);
}
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) {
return promise.thenAcceptAsync(action);
}
@Override
public boolean hasListeners() {
return promise.hasListeners();
}
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
return promise.thenAcceptAsync(action, executor);
}
public CompletionStage<Void> thenRun(Runnable action) {
return promise.thenRun(action);
}
public CompletionStage<Void> thenRunAsync(Runnable action) {
return promise.thenRunAsync(action);
}
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
return promise.thenRunAsync(action, executor);
}
public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return promise.thenCombine(other, fn);
}
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return promise.thenCombineAsync(other, fn);
}
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
return promise.thenCombineAsync(other, fn, executor);
}
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return promise.thenAcceptBoth(other, action);
}
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return promise.thenAcceptBothAsync(other, action);
}
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return promise.thenAcceptBothAsync(other, action, executor);
}
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
return promise.runAfterBoth(other, action);
}
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
return promise.runAfterBothAsync(other, action);
}
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return promise.runAfterBothAsync(other, action, executor);
}
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return promise.applyToEither(other, fn);
}
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return promise.applyToEitherAsync(other, fn);
}
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return promise.applyToEitherAsync(other, fn, executor);
}
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
return promise.acceptEither(other, action);
}
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
return promise.acceptEitherAsync(other, action);
}
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return promise.acceptEitherAsync(other, action, executor);
}
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
return promise.runAfterEither(other, action);
}
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
return promise.runAfterEitherAsync(other, action);
}
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return promise.runAfterEitherAsync(other, action, executor);
}
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
return promise.thenCompose(fn);
}
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
return promise.thenComposeAsync(fn);
}
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return promise.thenComposeAsync(fn, executor);
}
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) {
return promise.exceptionally(fn);
}
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
return promise.whenComplete(action);
}
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
return promise.whenCompleteAsync(action);
}
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return promise.whenCompleteAsync(action, executor);
}
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
return promise.handle(fn);
}
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
return promise.handleAsync(fn);
}
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return promise.handleAsync(fn, executor);
}
public CompletableFuture<T> toCompletableFuture() {
return promise.toCompletableFuture();
}
@Override
public void onComplete(BiConsumer<? super T, ? super Throwable> action) {
promise.onComplete(action);
}
}

@ -15,13 +15,12 @@
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
/**
*
* @author Nikita Koksharov

@ -21,7 +21,6 @@ import java.util.concurrent.CompletionException;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -53,7 +52,7 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
future.cancel(true);
});
future.onComplete((v, e) -> {
future.whenComplete((v, e) -> {
if (e != null) {
if (e instanceof CompletionException) {
e = e.getCause();

@ -30,6 +30,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -140,12 +141,12 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
if (ackFuture != null) {
ackFuture.cancel(false);
}
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (!res) {
result.tryFailure(new RedisException("Task hasn't been added"));
result.completeExceptionally(new RedisException("Task hasn't been added"));
if (responseFuture != null) {
responseFuture.cancel(false);
}
@ -162,7 +163,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
responseFuture.cancel(false);
}
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
@ -172,7 +173,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.onComplete((re, ex2) -> {
if (ex2 != null) {
result.tryFailure(ex2);
result.completeExceptionally(ex2);
return;
}
@ -181,7 +182,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
"No ACK response after "
+ optionsCopy.getAckTimeoutInMillis()
+ "ms for request: " + requestId);
result.tryFailure(exc);
result.completeExceptionally(exc);
return;
}
@ -204,7 +205,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return result;
return new CompletableFutureWrapper<>(result);
}
private void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> result,
@ -212,7 +213,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
RFuture<Boolean> deleteFuture = new RedissonBucket<>(commandExecutor, ackName).deleteAsync();
deleteFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
@ -229,14 +230,14 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
responseFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
if (res == null) {
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + result.getRequestId());
result.tryFailure(ex);
result.completeExceptionally(ex);
return;
}
@ -247,11 +248,11 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
RemoteServiceResponse response = (RemoteServiceResponse) res;
if (response.getError() != null) {
result.tryFailure(response.getError());
result.completeExceptionally(response.getError());
return;
}
result.trySuccess(response.getResult());
result.complete(response.getResult());
});
}
@ -394,7 +395,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
cancelExecution(optionsCopy, mayInterruptIfRunning, promise, cancelRequestMapName);
return promise.thenApply(r -> promise.isCancelled());
return promise.toCompletableFuture().thenApply(r -> promise.isCancelled());
}
private void cancelExecution(RemoteInvocationOptions optionsCopy,

@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Mono;
@ -47,7 +48,7 @@ public class ReactiveRemoteProxy extends AsyncRemoteProxy {
@Override
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return ((CommandReactiveExecutor) commandExecutor).reactive(() -> result);
return ((CommandReactiveExecutor) commandExecutor).reactive(() -> new CompletableFutureWrapper<>(result));
}
}

@ -45,11 +45,11 @@ public class CommandRxBatchService extends CommandRxService {
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
Flowable<R> flowable = super.flowable(new Callable<RFuture<R>>() {
volatile RFuture<R> future;
public <R> Flowable<R> flowable(Callable<CompletableFuture<R>> supplier) {
Flowable<R> flowable = super.flowable(new Callable<CompletableFuture<R>>() {
volatile CompletableFuture<R> future;
@Override
public RFuture<R> call() throws Exception {
public CompletableFuture<R> call() throws Exception {
if (future == null) {
synchronized (this) {
if (future == null) {
@ -75,11 +75,11 @@ public class CommandRxBatchService extends CommandRxService {
return batchService.async(readOnlyMode, nodeSource, codec, command, params, ignoreRedirect, noRetry);
}
public RFuture<BatchResult<?>> executeAsync() {
return batchService.executeAsync();
public CompletableFuture<BatchResult<?>> executeAsync() {
return batchService.executeAsync().toCompletableFuture();
}
public RFuture<Void> discardAsync() {
return batchService.discardAsync();
public CompletableFuture<Void> discardAsync() {
return batchService.discardAsync().toCompletableFuture();
}
}

@ -15,12 +15,11 @@
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import io.reactivex.rxjava3.core.Flowable;
import org.redisson.command.CommandAsyncExecutor;
import io.reactivex.rxjava3.core.Flowable;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
/**
*
@ -29,6 +28,6 @@ import io.reactivex.rxjava3.core.Flowable;
*/
public interface CommandRxExecutor extends CommandAsyncExecutor {
<R> Flowable<R> flowable(Callable<RFuture<R>> supplier);
<R> Flowable<R> flowable(Callable<CompletableFuture<R>> supplier);
}

@ -15,19 +15,18 @@
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
/**
*
* @author Nikita Koksharov
@ -40,12 +39,12 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
public <R> Flowable<R> flowable(Callable<CompletableFuture<R>> supplier) {
ReplayProcessor<R> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
RFuture<R> future;
CompletableFuture<R> future;
try {
future = supplier.call();
} catch (Exception e) {
@ -59,7 +58,7 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
}
});
future.onComplete((res, e) -> {
future.whenComplete((res, e) -> {
if (e != null) {
if (e instanceof CompletionException) {
e = e.getCause();

@ -46,11 +46,11 @@ public class RedissonBinaryStreamRx {
}
public Single<Integer> read(ByteBuffer buf) {
return commandExecutor.flowable(() -> (RFuture<Integer>) channel.read(buf)).singleOrError();
return commandExecutor.flowable(() -> ((RFuture<Integer>) channel.read(buf)).toCompletableFuture()).singleOrError();
}
public Single<Integer> write(ByteBuffer buf) {
return commandExecutor.flowable(() -> (RFuture<Integer>) channel.write(buf)).singleOrError();
return commandExecutor.flowable(() -> ((RFuture<Integer>) channel.write(buf)).toCompletableFuture()).singleOrError();
}
}

@ -116,12 +116,12 @@ public class RedissonTransactionRx implements RTransactionRx {
@Override
public Completable commit() {
return executorService.flowable(() -> transaction.commitAsync()).ignoreElements();
return executorService.flowable(() -> transaction.commitAsync().toCompletableFuture()).ignoreElements();
}
@Override
public Completable rollback() {
return executorService.flowable(() -> transaction.rollbackAsync()).ignoreElements();
return executorService.flowable(() -> transaction.rollbackAsync().toCompletableFuture()).ignoreElements();
}
}

@ -15,16 +15,14 @@
*/
package org.redisson.rx;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import org.redisson.api.RFuture;
import org.redisson.misc.ProxyBuilder;
import org.redisson.misc.ProxyBuilder.Callback;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.lang.reflect.Method;
/**
*
@ -41,12 +39,7 @@ public class RxProxyBuilder {
return ProxyBuilder.create(new Callback() {
@Override
public Object execute(Method mm, Object instance, Method instanceMethod, Object[] args) {
Flowable<Object> flowable = commandExecutor.flowable(new Callable<RFuture<Object>>() {
@Override
public RFuture<Object> call() throws Exception {
return (RFuture<Object>) mm.invoke(instance, args);
}
});
Flowable<Object> flowable = commandExecutor.flowable(() -> ((RFuture<Object>) mm.invoke(instance, args)).toCompletableFuture());
if (instanceMethod.getReturnType() == Completable.class) {
return flowable.ignoreElements();

Loading…
Cancel
Save