refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent ca1e9ac7bd
commit f97f6ec27a

@ -346,7 +346,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -376,7 +376,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -379,7 +379,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -351,7 +351,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -349,7 +349,7 @@ public class RedissonConnection extends AbstractRedisConnection {
return null;
}
if (isQueueing()) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
return null;
}

@ -705,8 +705,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private <T> void syncExecute(RemotePromise<T> promise) {
RFuture<Boolean> addFuture = promise.getAddFuture();
addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow();
Boolean res = addFuture.toCompletableFuture().join();
if (!res) {
throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
}

@ -2318,8 +2318,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
if (osType == null) {
RFuture<Map<String, String>> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
serverFuture.syncUninterruptibly();
String os = serverFuture.getNow().get("os");
String os = serverFuture.toCompletableFuture().join().get("os");
if (os == null || os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {

@ -424,7 +424,7 @@ public class RedissonMultiLock implements RLock {
acquiredLocks.stream()
.map(l -> (RedissonLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.syncUninterruptibly());
.forEach(f -> f.toCompletableFuture().join());
}
return true;
@ -462,7 +462,7 @@ public class RedissonMultiLock implements RLock {
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
future.toCompletableFuture().join();
}
}

@ -135,7 +135,7 @@ public class RedissonPatternTopic implements RPatternTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName).syncUninterruptibly();
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName).toCompletableFuture().join();
}
semaphore.release();
}

@ -158,7 +158,7 @@ public class RedissonTopic implements RTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName).syncUninterruptibly();
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName).toCompletableFuture().join();
}
semaphore.release();
}

@ -57,14 +57,11 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
V getNow();
/**
* Returns the result value when complete, or throws an
* (unchecked) exception if completed exceptionally. To better
* conform with the use of common functional forms, if a
* computation involved in the completion of this
* CompletableFuture threw an exception.
* Use toCompletableFuture().join() method instead
*
* @return the result value
*/
@Deprecated
V join();
/**
@ -95,21 +92,21 @@ public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionSt
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
* Use toCompletableFuture().get() method instead
*
* @throws InterruptedException
* if the current thread was interrupted
* @return Future object
*/
@Deprecated
RFuture<V> sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*
* Use toCompletableFuture().join() method instead
*
* @return Future object
*/
@Deprecated
RFuture<V> syncUninterruptibly();
/**

@ -45,6 +45,8 @@ import java.io.IOException;
import java.security.MessageDigest;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -90,7 +92,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
future.syncUninterruptibly();
try {
future.toCompletableFuture().join();
} catch (CompletionException e) {
throw (RuntimeException) e.getCause();
}
}
@Override
@ -100,7 +106,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!future.await(timeout)) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
future.sync();
try {
future.toCompletableFuture().get();
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
@Override

@ -225,7 +225,7 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
} else {
connectionFuture = connectionManager.connectionReadOp(source, null);
}
connectionFuture.syncUninterruptibly();
connectionFuture.toCompletableFuture().join();
entry.setConnectionFuture(connectionFuture);
entry.setCancelCallback(() -> {

@ -213,7 +213,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void closeNodeConnections() {
nodeConnections.values().stream()
.map(c -> c.getRedisClient().shutdownAsync())
.forEach(f -> f.join());
.forEach(f -> f.toCompletableFuture().join());
}
protected void closeNodeConnection(RedisConnection conn) {

@ -679,7 +679,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
sentinels.values().stream()
.map(s -> s.shutdownAsync())
.forEach(f -> f.join());
.forEach(f -> f.toCompletableFuture().join());
super.shutdown();
}

@ -48,6 +48,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@ -207,7 +208,7 @@ public class TasksRunnerService implements RemoteExecutorService {
Object res;
try {
RFuture<Long> future = renewRetryTime(params.getRequestId());
future.sync();
future.toCompletableFuture().get();
Callable<?> callable = decode(params);
res = callable.call();
@ -216,6 +217,13 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (RedisException e) {
finish(params.getRequestId(), true);
throw e;
} catch (ExecutionException e) {
finish(params.getRequestId(), true);
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalArgumentException(e.getCause());
}
} catch (Exception e) {
finish(params.getRequestId(), true);
throw new IllegalArgumentException(e);
@ -331,7 +339,7 @@ public class TasksRunnerService implements RemoteExecutorService {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
RFuture<Long> future = renewRetryTime(params.getRequestId());
try {
future.sync();
future.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
@ -344,7 +352,14 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (RedisException e) {
finish(params.getRequestId(), removeTask);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalArgumentException(e.getCause());
}
}
finish(params.getRequestId(), removeTask);
}

@ -213,8 +213,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RLock lock = getLockedLock(key);
try {
RFuture<V> result = getAsync(key);
result.syncUninterruptibly();
return result.getNow();
return result.toCompletableFuture().join();
} finally {
lock.unlock();
}
@ -573,7 +572,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
res.syncUninterruptibly();
res.toCompletableFuture().join();
List<Object> r = res.getNow();
Long added = (Long) r.get(0);
@ -804,7 +803,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
res.syncUninterruptibly();
res.toCompletableFuture().join();
List<Object> r = res.getNow();
r.add(syncId);
@ -943,9 +942,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
RFuture<Map<K, V>> result = getAllAsync(keys);
result.syncUninterruptibly();
return result.getNow();
return result.toCompletableFuture().join();
}
@Override
@ -1089,8 +1086,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean containsKey(K key) {
RFuture<Boolean> future = containsKeyAsync(key);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -1318,7 +1314,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public void put(K key, V value) {
RFuture<Void> future = putAsync(key, value);
future.syncUninterruptibly();
future.toCompletableFuture().join();
}
RFuture<Long> removeValues(Object... keys) {
@ -1532,8 +1528,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public V getAndPut(K key, V value) {
RFuture<V> future = getAndPutAsync(key, value);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -1639,7 +1634,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public void putAll(Map<? extends K, ? extends V> map) {
RFuture<Void> result = putAllAsync(map);
result.syncUninterruptibly();
result.toCompletableFuture().join();
}
@Override
@ -1734,8 +1729,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean putIfAbsent(K key, V value) {
RFuture<Boolean> result = putIfAbsentAsync(key, value);
result.syncUninterruptibly();
return result.getNow();
return result.toCompletableFuture().join();
}
@Override
@ -1829,8 +1823,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean remove(K key) {
RFuture<Boolean> future = removeAsync(key);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -1868,7 +1861,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
});
});
} else {
future.syncUninterruptibly();
future.toCompletableFuture().join();
V oldValue = future.getNow();
try {
cacheWriter.delete(key);
@ -1990,8 +1983,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean remove(K key, V value) {
RFuture<Boolean> future = removeAsync(key, value);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -2043,7 +2035,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
try {
cacheWriter.delete(key);
} catch (Exception e) {
putValue(key, value).syncUninterruptibly();
putValue(key, value).toCompletableFuture().join();
if (e instanceof CacheWriterException) {
throw e;
}
@ -2134,7 +2126,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
future.syncUninterruptibly();
future.toCompletableFuture().join();
List<Object> r = future.getNow();
@ -2296,7 +2288,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
});
} else {
future.syncUninterruptibly();
future.toCompletableFuture().join();
List<Object> r = future.getNow();
@ -2325,8 +2317,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public V getAndRemove(K key) {
RFuture<V> future = getAndRemoveAsync(key);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -2542,8 +2533,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean replace(K key, V oldValue, V newValue) {
RFuture<Boolean> future = replaceAsync(key, oldValue, newValue);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -2864,8 +2854,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public boolean replace(K key, V value) {
RFuture<Boolean> future = replaceAsync(key, value);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -2931,8 +2920,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public V getAndReplace(K key, V value) {
RFuture<V> future = getAndReplaceAsync(key, value);
future.syncUninterruptibly();
return future.getNow();
return future.toCompletableFuture().join();
}
@Override
@ -2998,7 +2986,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public void removeAll(Set<? extends K> keys) {
RFuture<Void> future = removeAllAsync(keys);
future.syncUninterruptibly();
future.toCompletableFuture().join();
}
@Override
@ -3102,7 +3090,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
@Override
public void clear() {
RFuture<Void> future = clearAsync();
future.syncUninterruptibly();
future.toCompletableFuture().join();
}
@Override
@ -3251,7 +3239,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
private void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration, boolean addToConfig) {
if (osType == null) {
RFuture<Map<String, String>> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
serverFuture.syncUninterruptibly();
serverFuture.toCompletableFuture().join();
String os = serverFuture.getNow().get("os");
if (os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;

@ -339,7 +339,7 @@ public class CompletableFutureWrapper<V> implements RFuture<V> {
@Override
public RFuture<V> awaitUninterruptibly() {
try {
join();
future.join();
} catch (Exception e) {
// skip
}

@ -64,7 +64,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public Map<String, String> getMemoryStatistics() {
return getMemoryStatisticsAsync().syncUninterruptibly().getNow();
return getMemoryStatisticsAsync().toCompletableFuture().join();
}
@Override
@ -84,12 +84,12 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public boolean ping() {
return pingAsync().syncUninterruptibly().getNow();
return pingAsync().toCompletableFuture().join();
}
@Override
public boolean ping(long timeout, TimeUnit timeUnit) {
return pingAsync(timeout, timeUnit).syncUninterruptibly().getNow();
return pingAsync(timeout, timeUnit).toCompletableFuture().join();
}
@Override
@ -157,7 +157,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public Time time() {
return timeAsync().syncUninterruptibly().getNow();
return timeAsync().toCompletableFuture().join();
}
@Override
@ -167,7 +167,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public Map<String, String> info(InfoSection section) {
return infoAsync(section).syncUninterruptibly().getNow();
return infoAsync(section).toCompletableFuture().join();
}
@Override
@ -262,12 +262,12 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public Map<String, String> getConfig(String parameter) {
return getConfigAsync(parameter).syncUninterruptibly().getNow();
return getConfigAsync(parameter).toCompletableFuture().join();
}
@Override
public void setConfig(String parameter, String value) {
setConfigAsync(parameter, value).syncUninterruptibly().getNow();
setConfigAsync(parameter, value).toCompletableFuture().join();
}
@Override

@ -179,7 +179,7 @@ public class RedissonBatchTest extends BaseTest {
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> counter.get() == 0);
Assertions.assertThat(hasErrors).isTrue();
executeBatch(redisson, batchOptions).syncUninterruptibly();
executeBatch(redisson, batchOptions).toCompletableFuture().join();
redisson.shutdown();
process.shutdown();
@ -272,7 +272,7 @@ public class RedissonBatchTest extends BaseTest {
assertThat(e.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
for (RFuture<?> future : futures) {
future.syncUninterruptibly();
future.toCompletableFuture().join();
}
}
@ -411,7 +411,7 @@ public class RedissonBatchTest extends BaseTest {
for (int i = 0; i < total; i++) {
RFuture<String> f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES);
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.syncUninterruptibly();
f.toCompletableFuture().join();
}
}

@ -429,7 +429,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
@Test
public void testAsync() throws InterruptedException {
public void testAsync() {
RedissonClient r1 = createInstance();
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
@ -437,9 +437,9 @@ public class RedissonRemoteServiceTest extends BaseTest {
RemoteInterfaceAsync ri = r2.getRemoteService().get(RemoteInterfaceAsync.class);
RFuture<Void> f = ri.voidMethod("someName", 100L);
f.sync();
f.toCompletableFuture().join();
RFuture<Long> resFuture = ri.resultMethod(100L);
resFuture.sync();
resFuture.toCompletableFuture().join();
assertThat(resFuture.getNow()).isEqualTo(200);
r1.shutdown();
@ -490,9 +490,9 @@ public class RedissonRemoteServiceTest extends BaseTest {
RemoteInterfaceAsync ri = r2.getRemoteService().get(RemoteInterfaceAsync.class);
RFuture<Void> f = ri.voidMethod("someName", 100L);
f.sync();
f.toCompletableFuture().join();
RFuture<Long> resFuture = ri.resultMethod(100L);
resFuture.sync();
resFuture.toCompletableFuture().join();
assertThat(resFuture.getNow()).isEqualTo(200);
r1.shutdown();

@ -117,7 +117,7 @@ public class RedissonExecutorServiceSpringTest extends BaseTest {
Thread.sleep(500);
assertThat(future.sync().getNow()).isEqualTo("hello callable");
assertThat(future.toCompletableFuture().join()).isEqualTo("hello callable");
}
}

@ -84,7 +84,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"));
future.get(5, TimeUnit.SECONDS);
future.getTaskFutures().stream().forEach(x -> x.syncUninterruptibly());
future.getTaskFutures().stream().forEach(x -> x.toCompletableFuture().join());
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();

@ -219,7 +219,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
long start = System.currentTimeMillis();
RExecutorFuture<?> f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS);
f.syncUninterruptibly();
f.toCompletableFuture().join();
assertThat(System.currentTimeMillis() - start).isBetween(900L, 1300L);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);

Loading…
Cancel
Save