refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent cbb70e2f70
commit 6e2cac86e3

@ -35,6 +35,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.DataAccessException;
@ -51,6 +52,8 @@ import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -226,9 +229,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -20,18 +20,9 @@ import static org.redisson.client.protocol.RedisCommands.LRANGE;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -60,6 +51,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.DataAccessException;
@ -256,9 +248,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -20,18 +20,9 @@ import static org.redisson.client.protocol.RedisCommands.LRANGE;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -61,6 +52,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.DataAccessException;
@ -259,9 +251,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -36,6 +36,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.DataAccessException;
@ -55,6 +56,7 @@ import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -231,9 +233,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -32,6 +32,7 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.geo.*;
@ -50,6 +51,7 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -229,9 +231,11 @@ public class RedissonConnection extends AbstractRedisConnection {
return read(null, ByteArrayCodec.INSTANCE, KEYS, pattern);
}
Set<byte[]> results = new HashSet<byte[]>();
RFuture<Set<byte[]>> f = (RFuture<Set<byte[]>>)(Object)(executorService.readAllAsync(results, ByteArrayCodec.INSTANCE, KEYS, pattern));
return sync(f);
RFuture<Collection<byte[]>> f = executorService.readAllAsync(ByteArrayCodec.INSTANCE, KEYS, pattern);
CompletableFuture<Set<byte[]>> future = f.thenApply(r -> {
return (Set<byte[]>)new HashSet<>(r);
}).toCompletableFuture();
return sync(new CompletableFutureWrapper<>(future));
}
@Override

@ -79,8 +79,6 @@ public interface CommandAsyncExecutor {
<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

@ -188,53 +188,34 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(true, new NodeSource(client), codec, command, params, false, false);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, codec, command, params);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, connectionManager.getCodec(), command, params);
return readAllAsync(connectionManager.getCodec(), command, params);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params) {
CompletableFuture<Collection<R>> mainPromise = createPromise();
public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params) {
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
AtomicInteger counter = new AtomicInteger(nodes.size());
BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object result, Throwable u) {
if (u != null && !(u instanceof RedisRedirectException)) {
mainPromise.completeExceptionally(u);
return;
}
if (result instanceof Collection) {
synchronized (results) {
results.addAll((Collection) result);
}
} else {
synchronized (results) {
results.add((R) result);
}
}
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.complete(results);
}
}
};
List<CompletableFuture<?>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : nodes) {
RFuture<Object> f = async(true, new NodeSource(entry), codec, command, params, true, false);
f.whenComplete(listener);
futures.add(f.toCompletableFuture());
}
return new CompletableFutureWrapper<>(mainPromise);
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<Collection<R>> resFuture = future.thenApply(r -> {
List<R> results = new ArrayList<>();
for (CompletableFuture<?> f : futures) {
Object res = f.getNow(null);
if (res instanceof Collection) {
results.addAll((Collection) res);
} else {
results.add((R) res);
}
}
return results;
});
return new CompletableFutureWrapper<>(resFuture);
}
@Override

Loading…
Cancel
Save