refactoring

pull/4226/head
Nikita Koksharov 3 years ago
parent a3e77aabd6
commit ab530385da

@ -9,7 +9,7 @@ import java.util.Locale;
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.2.100\\redis-server.exe");
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\redis\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;

@ -27,6 +27,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
@ -309,15 +310,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -9,7 +9,7 @@ import java.util.Locale;
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.2.100\\redis-server.exe");
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Redis\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;

@ -13,7 +13,6 @@ import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -21,17 +20,17 @@ import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
@ -108,6 +107,16 @@ public class RedissonClusterConnectionTest {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
}
@Test
public void testKeys() {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
for (int i = 0; i < 1000; i++) {
connection.set(("test" + i).getBytes(), ("" + i).getBytes());
}
Set<byte[]> keys = connection.keys(node1, "test*".getBytes(StandardCharsets.UTF_8));
assertThat(keys).hasSize(342);
}
@Test
public void testClusterGetNodeForSlot() {

@ -309,15 +309,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -310,15 +310,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -316,15 +316,14 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return result;
}
private final RedisStrictCommand<List<byte[]>> KEYS = new RedisStrictCommand<>("KEYS");
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
MasterSlaveEntry entry = getEntry(node);
RFuture<Collection<byte[]>> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, KEYS, pattern);
Collection<byte[]> keys = syncFuture(f);
return new HashSet<>(keys);
}
@Override

@ -87,7 +87,7 @@ public class RedisCommand<R> {
if (replayMultiDecoder != null) {
this.replayMultiDecoder = replayMultiDecoder;
} else {
this.replayMultiDecoder = (parts, state) -> null;
this.replayMultiDecoder = (parts, state) -> (R) parts;
}
}

@ -97,8 +97,6 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params);
<T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object... params);

@ -187,11 +187,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(true, new NodeSource(client), codec, command, params, false, false);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
return readAllAsync(connectionManager.getCodec(), command, params);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params) {
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();

Loading…
Cancel
Save