refactoring

pull/4045/head
Nikita Koksharov 3 years ago
parent f97f6ec27a
commit a23572a552

@ -147,8 +147,12 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
boolean res = true;
for (Entry<RedisConnection, RFuture<String>> entry : result.entrySet()) {
RFuture<String> f = entry.getValue();
f.awaitUninterruptibly();
String pong = f.getNow();
String pong = null;
try {
pong = f.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
entry.getKey().closeAsync();
if (!"PONG".equals(pong)) {
res = false;

@ -136,11 +136,11 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
}
if (res == null) {
result.trySuccess(takeFuture.getNow());
result.trySuccess(res);
return;
}
createSemaphore(null).releaseAsync().onComplete((r, ex) -> {
result.trySuccess(takeFuture.getNow());
result.trySuccess(res);
});
});
return result;

@ -16,7 +16,10 @@
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -129,7 +132,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return true;
}
RFuture<RedissonCountDownLatchEntry> promise = subscribe();
if (!promise.await(time, unit)) {
try {
promise.toCompletableFuture().get(time, unit);
} catch (ExecutionException | CancellationException e) {
// skip
} catch (TimeoutException e) {
return false;
}

@ -1107,8 +1107,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException();
}
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()]));
future.await();
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[0]));
try {
future.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
List<?> futures = future.getTaskFutures();
return (List<Future<T>>) futures;
}
@ -1120,8 +1124,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException();
}
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()]));
future.await(timeout, unit);
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[0]));
try {
future.toCompletableFuture().get(timeout, unit);
} catch (ExecutionException | TimeoutException | CancellationException e) {
// skip
}
List<?> futures = future.getTaskFutures();
return (List<Future<T>>) futures;
}

@ -29,7 +29,9 @@ import org.redisson.pubsub.LockPubSub;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -234,10 +236,12 @@ public class RedissonLock extends RedissonBaseLock {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
try {
subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
subscribeFuture.onComplete((res, ex) -> {
if (ex == null) {
unsubscribe(subscribeFuture, threadId);
}
});

@ -311,7 +311,13 @@ public class RedissonMultiLock implements RLock {
protected void unlockInner(Collection<RLock> locks) {
locks.stream()
.map(RLockAsync::unlockAsync)
.forEach(RFuture::awaitUninterruptibly);
.forEach(f -> {
try {
f.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
});
}
protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {

@ -15,13 +15,7 @@
*/
package org.redisson;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufUtil;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -34,7 +28,12 @@ import org.redisson.connection.MasterSlaveEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
*
@ -169,21 +168,34 @@ public final class RedissonNode {
ConnectionManager connectionManager = ((Redisson) redisson).getConnectionManager();
for (MasterSlaveEntry entry : connectionManager.getEntrySet()) {
RFuture<RedisConnection> readFuture = entry.connectionReadOp(null);
if (readFuture.awaitUninterruptibly((long) connectionManager.getConfig().getConnectTimeout())
&& readFuture.isSuccess()) {
RedisConnection connection = readFuture.getNow();
entry.releaseRead(connection);
remoteAddress = (InetSocketAddress) connection.getChannel().remoteAddress();
localAddress = (InetSocketAddress) connection.getChannel().localAddress();
RedisConnection readConnection = null;
try {
readConnection = readFuture.toCompletableFuture().get(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// skip
}
if (readConnection != null) {
entry.releaseRead(readConnection);
remoteAddress = (InetSocketAddress) readConnection.getChannel().remoteAddress();
localAddress = (InetSocketAddress) readConnection.getChannel().localAddress();
return;
}
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(null);
if (writeFuture.awaitUninterruptibly((long) connectionManager.getConfig().getConnectTimeout())
&& writeFuture.isSuccess()) {
RedisConnection connection = writeFuture.getNow();
entry.releaseWrite(connection);
remoteAddress = (InetSocketAddress) connection.getChannel().remoteAddress();
localAddress = (InetSocketAddress) connection.getChannel().localAddress();
RedisConnection writeConnection = null;
try {
writeConnection = writeFuture.toCompletableFuture().get(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// skip
}
if (writeConnection != null) {
entry.releaseWrite(writeConnection);
remoteAddress = (InetSocketAddress) writeConnection.getChannel().remoteAddress();
localAddress = (InetSocketAddress) writeConnection.getChannel().localAddress();
return;
}
}

@ -17,8 +17,10 @@ package org.redisson;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -433,7 +435,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
try {
future.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
return null;
}

@ -183,9 +183,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RMap<String, RemoteServiceRequest> tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
commandExecutor.getInterrupted(taskFuture);
RemoteServiceRequest request = taskFuture.getNow();
RemoteServiceRequest request = commandExecutor.getInterrupted(taskFuture);
if (request == null) {
throw new IllegalStateException("Task can't be found for request: " + requestId);
}

@ -31,7 +31,10 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -301,7 +304,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
try {
future.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | CancellationException | TimeoutException e) {
log.debug("unable to subscribe for permits acquisition, permits: {}, name: {}", permits, getName());
return false;
}

@ -37,9 +37,15 @@ import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceRequest;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
/**
*
* @author Nikita Koksharov
@ -176,8 +182,11 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
long remainTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
if (!future.getAddFuture().await(remainTime, TimeUnit.MILLISECONDS)) {
if (!future.getAddFuture().cancel(false)) {
CompletableFuture<Boolean> addFuture = future.getAddFuture().toCompletableFuture();
try {
addFuture.get(remainTime, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!addFuture.cancel(false)) {
if (!future.cancel(false)) {
commandExecutor.getInterrupted(future);
return true;
@ -187,7 +196,9 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
}
remainTime -= System.currentTimeMillis() - startTime;
if (!future.await(remainTime)) {
try {
future.toCompletableFuture().get(remainTime, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!future.cancel(false)) {
commandExecutor.getInterrupted(future);
return true;

@ -65,8 +65,15 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
V join();
/**
* Waits for this future to be completed within the
* specified time limit.
* Use snippet below instead.
*
* <pre>
* try {
* toCompletableFuture().get();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @param timeout - wait timeout
* @param unit - time unit
@ -76,11 +83,19 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
* @throws InterruptedException
* if the current thread was interrupted
*/
@Deprecated
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
* Use snippet below instead.
*
* <pre>
* try {
* toCompletableFuture().get();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @param timeoutMillis - timeout value
* @return {@code true} if and only if the future was completed within
@ -89,6 +104,7 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
* @throws InterruptedException
* if the current thread was interrupted
*/
@Deprecated
boolean await(long timeoutMillis) throws InterruptedException;
/**
@ -110,44 +126,74 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
RFuture<V> syncUninterruptibly();
/**
* Waits for this future to be completed.
* Use snippet below instead.
*
* <pre>
* try {
* toCompletableFuture().get();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @throws InterruptedException
* if the current thread was interrupted
* @return Future object
*/
@Deprecated
RFuture<V> await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*
* Use snippet below instead.
*
* <pre>
* try {
* rFuture.toCompletableFuture().join();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @return Future object
*/
@Deprecated
RFuture<V> awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
* Use snippet below instead.
*
* <pre>
* try {
* toCompletableFuture().get();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @param timeout - timeout value
* @param unit - timeout unit value
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
@Deprecated
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* Use snippet below instead.
*
* <pre>
* try {
* toCompletableFuture().get();
* } catch (Exception e) {
* // skip
* }
* </pre>
*
* @param timeoutMillis - timeout value
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
@Deprecated
boolean awaitUninterruptibly(long timeoutMillis);
void onComplete(BiConsumer<? super V, ? super Throwable> action);

@ -28,6 +28,7 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
*
@ -40,7 +41,7 @@ public interface CommandAsyncExecutor {
ConnectionManager getConnectionManager();
<V> RedisException convertException(RFuture<V> future);
RedisException convertException(ExecutionException e);
void syncSubscription(RFuture<?> future);

@ -45,8 +45,7 @@ import java.io.IOException;
import java.security.MessageDigest;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -84,18 +83,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public void syncSubscription(RFuture<?> future) {
MasterSlaveServersConfig config = connectionManager.getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
try {
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.await(timeout)) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
future.toCompletableFuture().get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
future.toCompletableFuture().join();
} catch (CompletionException e) {
} catch (CancellationException e) {
// skip
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
}
@ -103,13 +101,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public void syncSubscriptionInterrupted(RFuture<?> future) throws InterruptedException {
MasterSlaveServersConfig config = connectionManager.getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.await(timeout)) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
try {
future.toCompletableFuture().get();
future.toCompletableFuture().get(timeout, TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
// skip
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
}
@ -120,33 +119,26 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
try {
future.await();
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
@Override
public <V> V getInterrupted(RFuture<V> future) throws InterruptedException {
try {
future.await();
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
((RPromise) future).tryFailure(e);
throw e;
} catch (ExecutionException e) {
throw convertException(e);
}
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
protected <R> RPromise<R> createPromise() {
@ -329,11 +321,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
public <V> RedisException convertException(RFuture<V> future) {
if (future.cause() instanceof RedisException) {
return (RedisException) future.cause();
public RedisException convertException(ExecutionException e) {
if (e.getCause() instanceof RedisException) {
return (RedisException) e.getCause();
}
return new RedisException("Unexpected exception while processing command", future.cause());
return new RedisException("Unexpected exception while processing command", e.getCause());
}
private NodeSource getNodeSource(String key) {

@ -72,13 +72,10 @@ public class LoadBalancerManager {
}
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
CompletableFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
futures.add(slaveFuture);
CompletableFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
futures.add(pubSubFuture);
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<Void> future = CompletableFuture.allOf(slaveFuture, pubSubFuture);
return future.thenAccept(r -> {
client2Entry.put(entry.getClient(), entry);
});

@ -227,12 +227,12 @@ public class TasksService extends BaseRemoteService {
result.tryFailure(ex);
return;
}
if (response.getNow() == null) {
if (r == null) {
result.trySuccess(false);
return;
}
result.trySuccess(response.getNow().isCanceled());
result.trySuccess(r.isCanceled());
});
});

@ -572,9 +572,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
res.toCompletableFuture().join();
List<Object> r = res.toCompletableFuture().join();
List<Object> r = res.getNow();
Long added = (Long) r.get(0);
Long syncs = (Long) r.get(1);
if (syncs > 0) {
@ -803,9 +802,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
res.toCompletableFuture().join();
List<Object> r = res.toCompletableFuture().join();
List<Object> r = res.getNow();
r.add(syncId);
waitSync(r);
result.trySuccess((Long) r.get(0) >= 1);
@ -1861,8 +1859,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
});
});
} else {
future.toCompletableFuture().join();
V oldValue = future.getNow();
V oldValue = future.toCompletableFuture().join();
try {
cacheWriter.delete(key);
if (oldValue != null) {
@ -2126,9 +2123,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
future.toCompletableFuture().join();
List<Object> r = future.getNow();
List<Object> r = future.toCompletableFuture().join();
long nullsAmount = (long) r.get(1);
if (nullsAmount == keys.size()) {
@ -2288,9 +2283,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
future.toCompletableFuture().join();
List<Object> r = future.getNow();
List<Object> r = future.toCompletableFuture().join();
if (r.size() < 2) {
result.trySuccess(null);
@ -3239,8 +3232,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
private void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration, boolean addToConfig) {
if (osType == null) {
RFuture<Map<String, String>> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
serverFuture.toCompletableFuture().join();
String os = serverFuture.getNow().get("os");
String os = serverFuture.toCompletableFuture().join().get("os");
if (os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {

@ -17,11 +17,7 @@ package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import org.redisson.Redisson;
import org.redisson.api.RExecutorService;
@ -105,12 +101,20 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
mapperTask.addObjectName(objectName);
RFuture<?> mapperFuture = executor.submitAsync(mapperTask);
try {
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
if (timeout > 0) {
try {
mapperFuture.toCompletableFuture().get(timeout - timeSpent, TimeUnit.MILLISECONDS);
} catch (ExecutionException | CancellationException | TimeoutException e) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
}
if (timeout == 0) {
mapperFuture.await();
try {
mapperFuture.toCompletableFuture().join();
} catch (CompletionException | CancellationException e) {
// skip
}
}
} catch (InterruptedException e) {
mapperFuture.cancel(true);

@ -294,11 +294,13 @@ public class CompletableFutureWrapper<V> implements RFuture<V> {
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
try {
future.get();
future.get(timeout, unit);
} catch (ExecutionException e) {
// skip
} catch (TimeoutException e) {
return false;
}
return future.isDone();
return true;
}
@Override

@ -16,6 +16,7 @@
package org.redisson.reactive;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
@ -54,6 +55,9 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
future.onComplete((v, e) -> {
if (e != null) {
if (e instanceof CompletionException) {
e = e.getCause();
}
emitter.error(e);
return;
}

@ -141,8 +141,13 @@ public class RedissonBaseNodes implements BaseRedisNodes {
boolean res = true;
for (Map.Entry<RedisConnection, RFuture<String>> entry : result.entrySet()) {
RFuture<String> f = entry.getValue();
f.awaitUninterruptibly();
if (!"PONG".equals(f.getNow())) {
String pong = null;
try {
pong = f.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (!"PONG".equals(pong)) {
res = false;
}
entry.getKey().closeAsync();

@ -38,7 +38,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -317,8 +316,8 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
cancelExecution(optionsCopy, mayInterruptIfRunning, this, cancelRequestMapName);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
} catch (CancellationException e) {
toCompletableFuture().get(60, TimeUnit.SECONDS);
} catch (Exception e) {
// skip
}
return isCancelled();

@ -20,6 +20,9 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.redisson.RedissonBucket;
import org.redisson.api.RFuture;
@ -85,18 +88,20 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
RFuture<Boolean> futureAdd = remoteService.addAsync(requestQueueName, request, addPromise);
futureAdd.await();
if (!futureAdd.isSuccess()) {
Boolean res;
try {
res = futureAdd.toCompletableFuture().join();
} catch (Exception e) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
throw futureAdd.cause();
throw e.getCause();
}
if (!futureAdd.get()) {
if (!res) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
@ -109,13 +114,20 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
// poll for the ack only if expected
if (ackFuture != null) {
String ackName = remoteService.getAckName(requestId);
ackFuture.await(optionsCopy.getAckTimeoutInMillis());
RemoteServiceAck ack = ackFuture.getNow();
RemoteServiceAck ack = null;
try {
ack = ackFuture.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
if (ack == null) {
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.await(optionsCopy.getAckTimeoutInMillis());
ack = ackFutureAttempt.getNow();
try {
ack = ackFutureAttempt.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after "
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
@ -126,8 +138,12 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
// poll for the response only if expected
if (responseFuture != null) {
responseFuture.awaitUninterruptibly();
RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow();
RemoteServiceResponse response = null;
try {
response = (RemoteServiceResponse) responseFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (response == null) {
throw new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);

@ -16,6 +16,7 @@
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
@ -60,6 +61,9 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
future.onComplete((res, e) -> {
if (e != null) {
if (e instanceof CompletionException) {
e = e.getCause();
}
p.onError(e);
return;
}

@ -663,9 +663,8 @@ public class BaseTransactionalMap<K, V> {
return;
}
Set<K> set = future.getNow();
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
for (Iterator<K> iterator = set.iterator(); iterator.hasNext();) {
for (Iterator<K> iterator = res.iterator(); iterator.hasNext();) {
K key = iterator.next();
MapEntry value = newstate.remove(toKeyHash(key));
if (value == MapEntry.NULL) {
@ -677,10 +676,10 @@ public class BaseTransactionalMap<K, V> {
if (entry == MapEntry.NULL) {
continue;
}
set.add((K) entry.getKey());
res.add((K) entry.getKey());
}
result.trySuccess(set);
result.trySuccess(res);
});
return result;
@ -777,7 +776,7 @@ public class BaseTransactionalMap<K, V> {
return;
}
map.putAll(future.getNow());
map.putAll(res);
result.trySuccess(map);
});

@ -190,9 +190,8 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return;
}
Set<V> set = future.getNow();
Map<HashValue, Object> newstate = new HashMap<>(state);
for (Iterator<V> iterator = set.iterator(); iterator.hasNext();) {
for (Iterator<V> iterator = res.iterator(); iterator.hasNext();) {
V key = iterator.next();
Object value = newstate.remove(toHash(key));
if (value == NULL) {
@ -204,10 +203,10 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
if (value == NULL) {
continue;
}
set.add((V) value);
res.add((V) value);
}
result.trySuccess(set);
result.trySuccess(res);
});
return result;

@ -183,15 +183,14 @@ public class RedissonTransaction implements RTransaction {
}
String id = generateId();
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id, localCaches, operations);
future.onComplete((res, ex) -> {
RPromise<Void> result = new RedissonPromise<>();
CompletableFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id, localCaches, operations);
future.whenComplete((hashes, ex) -> {
if (ex != null) {
result.tryFailure(new TransactionException("Unable to execute transaction", ex));
return;
}
Map<HashKey, HashValue> hashes = future.getNow();
try {
checkTimeout();
} catch (TransactionTimeoutException e) {
@ -404,12 +403,12 @@ public class RedissonTransaction implements RTransaction {
return hashes;
}
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(String requestId, Set<String> localCaches, List<TransactionalOperation> operations) {
private CompletableFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(String requestId, Set<String> localCaches, List<TransactionalOperation> operations) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
return CompletableFuture.completedFuture(Collections.emptyMap());
}
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<>();
CompletableFuture<Map<HashKey, HashValue>> result = new CompletableFuture<>();
Map<HashKey, HashValue> hashes = new HashMap<>(localCaches.size());
RedissonBatch batch = createBatch();
for (TransactionalOperation transactionalOperation : operations) {
@ -437,13 +436,13 @@ public class RedissonTransaction implements RTransaction {
RFuture<BatchResult<?>> batchListener = batch.executeAsync();
batchListener.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
result.completeExceptionally(e);
return;
}
AsyncCountDownLatch latch = new AsyncCountDownLatch();
latch.latch(() -> {
result.trySuccess(hashes);
result.complete(hashes);
}, hashes.size());
List<CompletableFuture<?>> subscriptionFutures = new ArrayList<>();
@ -489,21 +488,21 @@ public class RedissonTransaction implements RTransaction {
RFuture<BatchResult<?>> publishFuture = publishBatch.executeAsync();
publishFuture.onComplete((res2, ex2) -> {
result.onComplete((res3, ex3) -> {
result.whenComplete((res3, ex3) -> {
for (RTopic topic : topics) {
topic.removeAllListeners();
}
});
if (ex2 != null) {
result.tryFailure(ex2);
result.completeExceptionally(ex2);
return;
}
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms"));
result.completeExceptionally(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms"));
}
}, options.getResponseTimeout(), TimeUnit.MILLISECONDS);
});

@ -16,8 +16,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
@ -894,9 +896,11 @@ public class RedisRunner {
RedisConnection connection = c.connect();
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
.toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
} catch (ExecutionException | TimeoutException e) {
// skip
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
@ -928,9 +932,11 @@ public class RedisRunner {
RedisConnection connection = c.connect();
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
.get(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
} catch (ExecutionException | TimeoutException e) {
// skip
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();

@ -16,6 +16,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.command.BatchPromise;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
@ -119,7 +120,17 @@ public class RedissonBatchTest extends BaseTest {
batch.execute();
futures.forEach(f -> assertThat(f.awaitUninterruptibly(1)).isTrue());
futures.forEach(f -> {
try {
f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
org.junit.jupiter.api.Assertions.fail(e);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
sourceClient.shutdown();
destinationClient.shutdown();
@ -411,7 +422,7 @@ public class RedissonBatchTest extends BaseTest {
for (int i = 0; i < total; i++) {
RFuture<String> f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES);
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.toCompletableFuture().join();
((BatchPromise)f).getSentPromise().toCompletableFuture().join();
}
}
@ -474,10 +485,10 @@ public class RedissonBatchTest extends BaseTest {
List<Object> list = (List<Object>) f.getResponses();
assertThat(list).containsExactly(1L, 2L, 3L, 2L);
assertThat(f1.getNow()).isEqualTo(1);
assertThat(f2.getNow()).isEqualTo(2);
assertThat(f3.getNow()).isEqualTo(3);
assertThat(d1.getNow()).isEqualTo(2);
assertThat(f1.toCompletableFuture().getNow(null)).isEqualTo(1);
assertThat(f2.toCompletableFuture().getNow(null)).isEqualTo(2);
assertThat(f3.toCompletableFuture().getNow(null)).isEqualTo(3);
assertThat(d1.toCompletableFuture().getNow(null)).isEqualTo(2);
}
@ParameterizedTest
@ -535,8 +546,8 @@ public class RedissonBatchTest extends BaseTest {
RFuture<Object> val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21");
b.execute();
org.junit.jupiter.api.Assertions.assertEquals("2", val1.getNow());
org.junit.jupiter.api.Assertions.assertEquals("3", val2.getNow());
org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null));
org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null));
}
@ParameterizedTest
@ -549,8 +560,8 @@ public class RedissonBatchTest extends BaseTest {
RFuture<Object> val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21");
b.execute();
org.junit.jupiter.api.Assertions.assertEquals("2", val1.getNow());
org.junit.jupiter.api.Assertions.assertEquals("3", val2.getNow());
org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null));
org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null));
}
@ParameterizedTest
@ -706,7 +717,7 @@ public class RedissonBatchTest extends BaseTest {
int i = 0;
for (Object element : s.getResponses()) {
RFuture<Long> a = futures.get(i);
org.junit.jupiter.api.Assertions.assertEquals(a.getNow(), element);
org.junit.jupiter.api.Assertions.assertEquals(a.toCompletableFuture().getNow(null), element);
i++;
}
}

@ -54,8 +54,13 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
RedissonClient redisson = Redisson.create(config);
final RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS);
Assertions.assertFalse(f.await(1, TimeUnit.SECONDS));
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
Assertions.fail();
} catch (TimeoutException e) {
// skip
}
runner.stop();
long start = System.currentTimeMillis();
@ -144,7 +149,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.pollAsync(10, TimeUnit.SECONDS);
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
runner.stop();
runner = new RedisRunner()
@ -196,7 +205,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
RFuture<Integer> f = queue.takeAsync();
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
futures.add(f);
}
@ -211,11 +224,15 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
for (int i = 0; i < 10; i++) {
RFuture<Integer> f = futures.get(i);
f.await(20, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(20, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
if (f.cause() != null) {
f.cause().printStackTrace();
}
Integer result = f.getNow();
Integer result = f.toCompletableFuture().getNow(null);
assertThat(result).isEqualTo(i*100);
}
@ -273,7 +290,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
master.stop();
@ -314,7 +335,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
e.printStackTrace();
}
runner.stop();
runner = new RedisRunner()

@ -227,8 +227,13 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
final RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("bounded-queue:pollTimeout");
assertThat(queue1.trySetCapacity(5)).isTrue();
RFuture<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS);
Assertions.assertFalse(f.await(1, TimeUnit.SECONDS));
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
Assertions.fail();
} catch (TimeoutException e) {
// skip
}
runner.stop();
long start = System.currentTimeMillis();
@ -308,7 +313,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("queue:pollany");
RFuture<Integer> f = queue1.pollAsync(10, TimeUnit.SECONDS);
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
runner.stop();
runner = new RedisRunner()
@ -349,7 +358,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("testTakeReattach");
assertThat(queue1.trySetCapacity(15)).isTrue();
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
runner.stop();
runner = new RedisRunner()

@ -267,62 +267,62 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadFourthWaiter = 105;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(firstTTL);
Assertions.assertTrue(firstTTL >= 29900 && firstTTL <= 30100, "Expected 30000 +/- 100 but was " + firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(secondTTL);
Assertions.assertTrue(secondTTL >= 34900 && secondTTL <= 35100, "Expected 35000 +/- 100 but was " + secondTTL);
// try the third, and check the TTL
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
Assertions.assertTrue(thirdTTL >= 39900 && thirdTTL <= 40100, "Expected 40000 +/- 100 but was " + thirdTTL);
// try the fourth, and check the TTL
Long fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Long fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(fourthTTL);
Assertions.assertTrue(fourthTTL >= 44900 && fourthTTL <= 45100, "Expected 45000 +/- 100 but was " + fourthTTL);
// wait timeout the second waiter
lock.acquireFailedAsync(5000, TimeUnit.MILLISECONDS, threadSecondWaiter).await().get();
lock.acquireFailedAsync(5000, TimeUnit.MILLISECONDS, threadSecondWaiter).toCompletableFuture().join();;
// try the first, and check the TTL
firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(firstTTL);
Assertions.assertTrue(firstTTL >= 29900 && firstTTL <= 30100, "Expected 30000 +/- 100 but was " + firstTTL);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
Assertions.assertTrue(thirdTTL >= 34700 && thirdTTL <= 35300, "Expected 35000 +/- 300 but was " + thirdTTL);
// try the fourth, and check the TTL
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(fourthTTL);
Assertions.assertTrue(fourthTTL >= 39900 && fourthTTL <= 40100, "Expected 40000 +/- 100 but was " + fourthTTL);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();;
Assertions.assertNotNull(unlocked);
Assertions.assertTrue(unlocked);
// acquire the lock immediately with the 1nd
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
Assertions.assertTrue(thirdTTL >= 29700 && thirdTTL <= 30300, "Expected 30000 +/- 300 but was " + thirdTTL);
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(fourthTTL);
Assertions.assertTrue(fourthTTL >= 34900 && fourthTTL <= 35100, "Expected 35000 +/- 100 but was " + fourthTTL);
}
@ -348,40 +348,40 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Boolean locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Boolean locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertTrue(locked);
// fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertFalse(locked);
// fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertFalse(locked);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();;
Assertions.assertTrue(unlocked);
// get the lock
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertTrue(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 100ms
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertFalse(locked);
Thread.sleep(500);
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();;
Assertions.assertTrue(locked);
}
@ -405,36 +405,36 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(secondTTL);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();;
Assertions.assertNotNull(unlocked);
Assertions.assertTrue(unlocked);
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
Long secondTTLAgain = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTLAgain = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(secondTTLAgain);
long diff = secondTTL - secondTTLAgain;
Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff);
diff = thirdTTL - secondTTLAgain;
Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff);
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
diff = thirdTTL - secondTTLAgain;
Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff);
@ -462,18 +462,18 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(secondTTL);
Long thirdTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNotNull(thirdTTL);
long diff = thirdTTL - firstTTL;
@ -481,7 +481,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Thread.sleep(thirdTTL + threadWaitTime);
ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();;
Assertions.assertNull(ttl);
}

@ -47,7 +47,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.pollAsync(10, TimeUnit.SECONDS);
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
runner.stop();
runner = new RedisRunner()
@ -83,7 +87,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
runner.stop();
runner = new RedisRunner()

@ -440,7 +440,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
f.toCompletableFuture().join();
RFuture<Long> resFuture = ri.resultMethod(100L);
resFuture.toCompletableFuture().join();
assertThat(resFuture.getNow()).isEqualTo(200);
assertThat(resFuture.toCompletableFuture().join()).isEqualTo(200);
r1.shutdown();
r2.shutdown();
@ -492,8 +492,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
RFuture<Void> f = ri.voidMethod("someName", 100L);
f.toCompletableFuture().join();
RFuture<Long> resFuture = ri.resultMethod(100L);
resFuture.toCompletableFuture().join();
assertThat(resFuture.getNow()).isEqualTo(200);
assertThat(resFuture.toCompletableFuture().join()).isEqualTo(200);
r1.shutdown();
r2.shutdown();

@ -58,7 +58,7 @@ public class RedissonScriptTest extends BaseTest {
public void testEvalAsync() {
RScript script = redisson.getScript(StringCodec.INSTANCE);
RFuture<List<Object>> res = script.evalAsync(RScript.Mode.READ_ONLY, "return {'1','2','3.3333','foo',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList());
assertThat(res.awaitUninterruptibly().getNow()).containsExactly("1", "2", "3.3333", "foo");
assertThat(res.toCompletableFuture().join()).containsExactly("1", "2", "3.3333", "foo");
}
@Test
@ -117,7 +117,7 @@ public class RedissonScriptTest extends BaseTest {
public void testScriptLoadAsync() {
redisson.getBucket("foo").set("bar");
RFuture<String> r = redisson.getScript().scriptLoadAsync("return redis.call('get', 'foo')");
Assertions.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r.awaitUninterruptibly().getNow());
Assertions.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r.toCompletableFuture().join());
String r1 = redisson.getScript().evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
Assertions.assertEquals("bar", r1);
}
@ -143,7 +143,7 @@ public class RedissonScriptTest extends BaseTest {
String r = redisson.getScript().eval(Mode.READ_ONLY, "return redis.call('get', 'foo')", RScript.ReturnType.VALUE);
Assertions.assertEquals("bar", r);
RFuture<Object> r1 = redisson.getScript().evalShaAsync(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
Assertions.assertEquals("bar", r1.awaitUninterruptibly().getNow());
Assertions.assertEquals("bar", r1.toCompletableFuture().join());
}

@ -420,7 +420,11 @@ public class RedissonTest extends BaseTest {
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
try {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (!rFuture.isSuccess()) {
if (rFuture.cause().getMessage().contains("READONLY You can't write against")) {
readonlyErrors++;
@ -549,7 +553,11 @@ public class RedissonTest extends BaseTest {
assertThat(newMaster).isNotNull();
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
try {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (!rFuture.isSuccess()) {
Assertions.fail();
}
@ -649,7 +657,11 @@ public class RedissonTest extends BaseTest {
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
try {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (!rFuture.isSuccess()) {
errors++;
} else {
@ -723,7 +735,11 @@ public class RedissonTest extends BaseTest {
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
try {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
if (!rFuture.isSuccess()) {
rFuture.cause().printStackTrace();
errors++;

@ -1205,7 +1205,11 @@ public class RedissonTopicTest {
}
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
try {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
// skip
}
}
};
};

@ -161,12 +161,11 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
@Test
public void testDelay() {
public void testDelay() throws ExecutionException, InterruptedException, TimeoutException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
long start = System.currentTimeMillis();
RScheduledFuture<?> f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS);
assertThat(f.awaitUninterruptibly(12000)).isTrue();
assertThat(f.isSuccess()).isTrue();
f.toCompletableFuture().get(12, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isBetween(11000L, 11500L);
Reflect.onClass(RedissonExecutorService.class).set("RESULT_OPTIONS", RemoteInvocationOptions.defaults().noAck().expectResultWithin(3, TimeUnit.SECONDS));
@ -174,14 +173,12 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
start = System.currentTimeMillis();
RScheduledFuture<?> f1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
assertThat(f1.awaitUninterruptibly(6000)).isTrue();
assertThat(f1.isSuccess()).isTrue();
f1.toCompletableFuture().get(6, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L);
start = System.currentTimeMillis();
RScheduledFuture<?> f2 = executor.schedule(new RunnableTask(), 5, TimeUnit.SECONDS);
assertThat(f2.awaitUninterruptibly(6000)).isTrue();
assertThat(f2.isSuccess()).isTrue();
f2.toCompletableFuture().get(6, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L);
}
@ -260,7 +257,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
}
@Test
public void testLoad() {
public void testLoad() throws InterruptedException {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", Runtime.getRuntime().availableProcessors()*2));
@ -274,7 +271,13 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
}
for (RScheduledFuture<?> future : futures) {
assertThat(future.awaitUninterruptibly(5100)).isTrue();
try {
future.toCompletableFuture().get(5100, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Assertions.fail(e);
} catch (ExecutionException e) {
// skip
}
}
node.shutdown();

Loading…
Cancel
Save