refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent 3a4eaf2f4e
commit 10751fce40

@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -121,8 +122,8 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size());
CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {
RFuture<RedisConnection> f = entry.getClient().connectAsync();
f.onComplete((c, e) -> {
CompletableFuture<RedisConnection> f = entry.getClient().connectAsync();
f.whenComplete((c, e) -> {
if (c != null) {
RFuture<String> r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING);
result.put(c, r);

@ -152,11 +152,13 @@ public final class RedisClient {
public RedisConnection connect() {
try {
return connectAsync().syncUninterruptibly().getNow();
} catch (RedisException e) {
throw e;
} catch (Exception e) {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
return connectAsync().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw e;
} else {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
}
}
}
@ -197,9 +199,9 @@ public final class RedisClient {
return promise;
}
public RFuture<RedisConnection> connectAsync() {
public CompletableFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> ff = addrFuture.thenCompose(res -> {
return addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@ -239,33 +241,23 @@ public final class RedisClient {
});
return r;
});
RPromise<RedisConnection> res = new RedissonPromise<>();
// TODO refactor
ff.whenComplete((r, e) -> {
if (e != null) {
res.tryFailure(e.getCause());
return;
}
res.trySuccess(r);
});
return res;
}
public RedisPubSubConnection connectPubSub() {
try {
return connectPubSubAsync().syncUninterruptibly().getNow();
} catch (RedisException e) {
throw e;
} catch (Exception e) {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
return connectPubSubAsync().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedisException) {
throw e;
} else {
throw new RedisConnectionException("Unable to connect to: " + uri, e);
}
}
}
public RFuture<RedisPubSubConnection> connectPubSubAsync() {
public CompletableFuture<RedisPubSubConnection> connectPubSubAsync() {
CompletableFuture<InetSocketAddress> nameFuture = resolveAddr();
CompletableFuture<RedisPubSubConnection> ff = nameFuture.thenCompose(res -> {
return nameFuture.thenCompose(res -> {
CompletableFuture<RedisPubSubConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = pubSubBootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@ -305,18 +297,6 @@ public final class RedisClient {
});
return r;
});
RPromise<RedisPubSubConnection> res = new RedissonPromise<>();
// TODO refactor
ff.whenComplete((r, e) -> {
if (e != null) {
res.tryFailure(e);
return;
}
res.trySuccess(r);
});
return res;
}
public void shutdown() {

@ -16,7 +16,6 @@
package org.redisson.connection;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
@ -188,19 +187,18 @@ public class ClientConnectionsEntry {
freeConnections.add(connection);
}
public RFuture<RedisConnection> connect() {
RFuture<RedisConnection> future = client.connectAsync();
future.onComplete((conn, e) -> {
public CompletableFuture<RedisConnection> connect() {
CompletableFuture<RedisConnection> future = client.connectAsync();
return future.whenComplete((conn, e) -> {
if (e != null) {
return;
}
onConnect(conn);
log.debug("new connection created: {}", conn);
allConnections.add(conn);
});
return future;
}
private void onConnect(final RedisConnection conn) {
@ -224,20 +222,18 @@ public class ClientConnectionsEntry {
connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr());
}
public RFuture<RedisPubSubConnection> connectPubSub() {
RFuture<RedisPubSubConnection> future = client.connectPubSubAsync();
future.onComplete((res, e) -> {
public CompletableFuture<RedisPubSubConnection> connectPubSub() {
CompletableFuture<RedisPubSubConnection> future = client.connectPubSubAsync();
return future.whenComplete((conn, e) -> {
if (e != null) {
return;
}
RedisPubSubConnection conn = future.getNow();
onConnect(conn);
log.debug("new pubsub connection created: {}", conn);
allSubscribeConnections.add(conn);
});
return future;
}
public Queue<RedisConnection> getAllConnections() {

@ -247,8 +247,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
RPromise<RedisConnection> result = new RedissonPromise<>();
RFuture<RedisConnection> future = client.connectAsync();
future.onComplete((connection, e) -> {
CompletableFuture<RedisConnection> future = client.connectAsync();
future.whenComplete((connection, e) -> {
if (e != null) {
result.tryFailure(e);
return;

@ -587,8 +587,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
RFuture<RedisConnection> f = client.connectAsync();
return f.toCompletableFuture().thenApply(resp -> {
CompletableFuture<RedisConnection> f = client.connectAsync();
return f.thenApply(resp -> {
if (sentinels.putIfAbsent(ipAddr, client) == null) {
log.info("sentinel: {} added", ipAddr);
}

@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -247,8 +248,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
return (T) entry.pollConnection(command);
}
protected RFuture<T> connect(ClientConnectionsEntry entry) {
return (RFuture<T>) entry.connect();
protected CompletableFuture<T> connect(ClientConnectionsEntry entry) {
return (CompletableFuture<T>) entry.connect();
}
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise, RedisCommand<?> command) {
@ -273,8 +274,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
private void createConnection(ClientConnectionsEntry entry, RPromise<T> promise) {
RFuture<T> connFuture = connect(entry);
connFuture.onComplete((conn, e) -> {
CompletableFuture<T> connFuture = connect(entry);
connFuture.whenComplete((conn, e) -> {
if (e != null) {
promiseFailure(entry, promise, e);
return;
@ -353,8 +354,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.onComplete((c, e) -> {
CompletableFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.whenComplete((c, e) -> {
synchronized (entry) {
if (entry.getFreezeReason() != FreezeReason.RECONNECT) {
return;

@ -24,6 +24,8 @@ import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import java.util.concurrent.CompletableFuture;
/**
* Connection pool for Publish / Subscribe
*
@ -51,7 +53,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected RFuture<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
protected CompletableFuture<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
return entry.connectPubSub();
}

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -115,15 +116,13 @@ public class RedissonBaseNodes implements BaseRedisNodes {
Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size());
CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisNode entry : clients) {
RFuture<RedisConnection> f = entry.getClient().connectAsync();
f.onComplete((c, e) -> {
CompletableFuture<RedisConnection> f = entry.getClient().connectAsync();
f.whenComplete((c, e) -> {
if (c != null) {
RFuture<String> r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING);
result.put(c, r);
latch.countDown();
} else {
latch.countDown();
}
latch.countDown();
});
}

@ -34,6 +34,7 @@ import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
@ -119,8 +120,8 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
private <T> RFuture<T> executeAsync(T defaultValue, Codec codec, long timeout, RedisCommand<T> command, Object... params) {
RPromise<T> result = new RedissonPromise<>();
RFuture<RedisConnection> connectionFuture = client.connectAsync();
connectionFuture.onComplete((connection, ex) -> {
CompletableFuture<RedisConnection> connectionFuture = client.connectAsync();
connectionFuture.whenComplete((connection, ex) -> {
if (ex != null) {
if (defaultValue != null) {
result.trySuccess(defaultValue);

@ -59,9 +59,9 @@ public class RedisClientTest {
@Test
public void testConnectAsync() throws InterruptedException {
RFuture<RedisConnection> f = redisClient.connectAsync();
CompletableFuture<RedisConnection> f = redisClient.connectAsync();
CountDownLatch l = new CountDownLatch(2);
f.onComplete((conn, e) -> {
f.whenComplete((conn, e) -> {
assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG");
l.countDown();
});

Loading…
Cancel
Save