redisson-spring-data-16 and redisson-spring-data-17 modules reverted back

pull/2563/head
Nikita Koksharov 5 years ago
parent f7b3fbc64a
commit d0288e275d

@ -14,6 +14,8 @@
<name>Redisson/Spring Data Redis integration</name>
<modules>
<module>redisson-spring-data-16</module>
<module>redisson-spring-data-17</module>
<module>redisson-spring-data-18</module>
<module>redisson-spring-data-20</module>
<module>redisson-spring-data-21</module>

@ -0,0 +1,76 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.12.1-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>redisson-spring-data-16</artifactId>
<packaging>jar</packaging>
<name>Redisson/Spring Data Redis v1.6.x integration</name>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.6.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<archive>
<manifestEntries>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Automatic-Module-Name>redisson.spring.data16</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>3.0</version>
<configuration>
<basedir>${basedir}</basedir>
<header>${basedir}/../../header.txt</header>
<quiet>false</quiet>
<failIfMissing>true</failIfMissing>
<aggregate>false</aggregate>
<includes>
<include>src/main/java/org/redisson/</include>
</includes>
<excludes>
<exclude>target/**</exclude>
</excludes>
<useDefaultExcludes>true</useDefaultExcludes>
<mapping>
<java>JAVADOC_STYLE</java>
</mapping>
<strictCheck>true</strictCheck>
<useDefaultMapping>true</useDefaultMapping>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {
if (obj instanceof String) {
return ((String) obj).getBytes(CharsetUtil.UTF_8);
}
return obj;
}
}

@ -0,0 +1,34 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
*
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {
String val = obj.toString();
return DataType.fromCode(val);
}
}

@ -0,0 +1,60 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class ObjectListReplayDecoder2<T> implements MultiDecoder<List<T>> {
private final Decoder<Object> decoder;
public ObjectListReplayDecoder2() {
this(null);
}
public ObjectListReplayDecoder2(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public List<T> decode(List<Object> parts, State state) {
for (int i = 0; i < parts.size(); i++) {
Object object = parts.get(i);
if (object instanceof List) {
if (((List) object).isEmpty()) {
parts.set(i, null);
}
}
}
return (List<T>) parts;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
}

@ -0,0 +1,46 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -0,0 +1,127 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.connection.SentinelConnectionManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;
/**
* Redisson based connection factory
*
* @author Nikita Koksharov
*
*/
public class RedissonConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean {
private final static Log log = LogFactory.getLog(RedissonConnectionFactory.class);
public static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
new PassThroughExceptionTranslationStrategy(new RedissonExceptionConverter());
private Config config;
private RedissonClient redisson;
/**
* Creates factory with default Redisson configuration
*/
public RedissonConnectionFactory() {
this(Redisson.create());
}
/**
* Creates factory with defined Redisson instance
*
* @param redisson - Redisson instance
*/
public RedissonConnectionFactory(RedissonClient redisson) {
this.redisson = redisson;
}
/**
* Creates factory with defined Redisson config
*
* @param config - Redisson config
*/
public RedissonConnectionFactory(Config config) {
super();
this.config = config;
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return EXCEPTION_TRANSLATION.translate(ex);
}
@Override
public void destroy() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
if (config != null) {
redisson = Redisson.create(config);
}
}
@Override
public RedisConnection getConnection() {
return new RedissonConnection(redisson);
}
@Override
public boolean getConvertPipelineAndTxResults() {
return true;
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
}

@ -0,0 +1,56 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisTimeoutException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.RedisConnectionFailureException;
/**
* Converts Redisson exceptions to Spring compatible
*
* @author Nikita Koksharov
*
*/
public class RedissonExceptionConverter implements Converter<Exception, DataAccessException> {
@Override
public DataAccessException convert(Exception source) {
if (source instanceof RedisConnectionException) {
return new RedisConnectionFailureException(source.getMessage(), source);
}
if (source instanceof RedisException) {
return new InvalidDataAccessApiUsageException(source.getMessage(), source);
}
if (source instanceof DataAccessException) {
return (DataAccessException) source;
}
if (source instanceof RedisTimeoutException) {
return new QueryTimeoutException(source.getMessage(), source);
}
return null;
}
}

@ -0,0 +1,91 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.springframework.data.redis.connection.NamedNode;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
import org.springframework.data.redis.connection.convert.Converters;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSentinelConnection implements RedisSentinelConnection {
private final RedisConnection connection;
public RedissonSentinelConnection(RedisConnection connection) {
this.connection = connection;
}
@Override
public void failover(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_FAILOVER, master.getName());
}
private static List<RedisServer> toRedisServersList(List<Map<String, String>> source) {
List<RedisServer> servers = new ArrayList<RedisServer>(source.size());
for (Map<String, String> info : source) {
servers.add(RedisServer.newServerFrom(Converters.toProperties(info)));
}
return servers;
}
@Override
public Collection<RedisServer> masters() {
List<Map<String, String>> masters = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_MASTERS);
return toRedisServersList(masters);
}
@Override
public Collection<RedisServer> slaves(NamedNode master) {
List<Map<String, String>> slaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, master.getName());
return toRedisServersList(slaves);
}
@Override
public void remove(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_REMOVE, master.getName());
}
@Override
public void monitor(RedisServer master) {
connection.sync(RedisCommands.SENTINEL_MONITOR, master.getName(), master.getHost(),
master.getPort().intValue(), master.getQuorum().intValue());
}
@Override
public void close() throws IOException {
connection.closeAsync();
}
@Override
public boolean isOpen() {
return !connection.isClosed();
}
}

@ -0,0 +1,113 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.subscribeService = subscribeService;
}
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {
return;
}
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), (byte[])message);
getListener().onMessage(msg, null);
}
});
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
}
}
@Override
protected void doUnsubscribe(boolean all, byte[]... channels) {
for (byte[] channel : channels) {
subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
}
}
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), (byte[])message);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
}
}
@Override
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
for (byte[] pattern : patterns) {
subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE);
}
}
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
}
}

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedListReplayDecoder implements MultiDecoder<List<Tuple>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public List<Tuple> decode(List<Object> parts, State state) {
List<Tuple> result = new ArrayList<Tuple>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue()));
}
return result;
}
}

@ -0,0 +1,53 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedSetReplayDecoder implements MultiDecoder<Set<Tuple>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public Set<Tuple> decode(List<Object> parts, State state) {
Set<Tuple> result = new LinkedHashSet<Tuple>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue()));
}
return result;
}
}

@ -0,0 +1,43 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;
public SecondsConvertor(TimeUnit unit, TimeUnit source) {
super();
this.unit = unit;
this.source = source;
}
@Override
public Long convert(Object obj) {
return unit.convert((Long)obj, source);
}
}

@ -0,0 +1,50 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
*/
public class SetReplayDecoder<T> implements MultiDecoder<Set<T>> {
private final Decoder<Object> decoder;
public SetReplayDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
@Override
public Set<T> decode(List<Object> parts, State state) {
return new LinkedHashSet(parts);
}
}

@ -0,0 +1,75 @@
package org.redisson;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public abstract class BaseTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
defaultRedisson.shutdown();
try {
RedisRunner.shutDownDefaultRedisServerInstance();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@Before
public void before() throws IOException, InterruptedException {
if (redisson == null) {
redisson = defaultRedisson;
}
if (flushBetweenTests()) {
redisson.getKeys().flushall();
}
}
@After
public void after() throws InterruptedException {
}
public static Config createConfig() {
// String redisAddress = System.getProperty("redisAddress");
// if (redisAddress == null) {
// redisAddress = "127.0.0.1:6379";
// }
Config config = new Config();
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
// .setPassword("mypass1");
// config.useMasterSlaveConnection()
// .setMasterAddress("127.0.0.1:6379")
// .addSlaveAddress("127.0.0.1:6399")
// .addSlaveAddress("127.0.0.1:6389");
return config;
}
public static RedissonClient createInstance() {
Config config = createConfig();
return Redisson.create(config);
}
protected boolean flushBetweenTests() {
return true;
}
}

@ -0,0 +1,156 @@
package org.redisson;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.redisson.misc.BiHashMap;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class ClusterRunner {
private final LinkedHashMap<RedisRunner, String> nodes = new LinkedHashMap<>();
private final LinkedHashMap<String, String> slaveMasters = new LinkedHashMap<>();
public ClusterRunner addNode(RedisRunner runner) {
nodes.putIfAbsent(runner, getRandomId());
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_ENABLED)) {
runner.clusterEnabled(true);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT)) {
runner.clusterNodeTimeout(5000);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.PORT)) {
runner.randomPort(1);
runner.port(RedisRunner.findFreePort());
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.BIND)) {
runner.bind("127.0.0.1");
}
return this;
}
public ClusterRunner addNode(RedisRunner master, RedisRunner... slaves) {
addNode(master);
for (RedisRunner slave : slaves) {
addNode(slave);
slaveMasters.put(nodes.get(slave), nodes.get(master));
}
return this;
}
public synchronized ClusterProcesses run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException {
BiHashMap<String, RedisRunner.RedisProcess> processes = new BiHashMap<>();
for (RedisRunner runner : nodes.keySet()) {
List<String> options = getClusterConfig(runner);
String confFile = runner.dir() + File.separator + nodes.get(runner) + ".conf";
System.out.println("WRITING CONFIG: for " + nodes.get(runner));
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
options.stream().forEach((line) -> {
printer.println(line);
System.out.println(line);
});
}
processes.put(nodes.get(runner), runner.clusterConfigFile(confFile).run());
}
Thread.sleep(1000);
for (RedisRunner.RedisProcess process : processes.valueSet()) {
if (!process.isAlive()) {
throw new RedisRunner.FailedToStartRedisException();
}
}
return new ClusterProcesses(processes);
}
private List<String> getClusterConfig(RedisRunner runner) {
String me = runner.getInitialBindAddr() + ":" + runner.getPort();
List<String> nodeConfig = new ArrayList<>();
int c = 0;
for (RedisRunner node : nodes.keySet()) {
String nodeId = nodes.get(node);
StringBuilder sb = new StringBuilder();
String nodeAddr = node.getInitialBindAddr() + ":" + node.getPort();
sb.append(nodeId).append(" ");
sb.append(nodeAddr).append(" ");
sb.append(me.equals(nodeAddr)
? "myself,"
: "");
boolean isMaster = !slaveMasters.containsKey(nodeId);
if (isMaster) {
sb.append("master -");
} else {
sb.append("slave ").append(slaveMasters.get(nodeId));
}
sb.append(" ");
sb.append("0").append(" ");
sb.append(me.equals(nodeAddr)
? "0"
: "1").append(" ");
sb.append(c + 1).append(" ");
sb.append("connected ");
if (isMaster) {
sb.append(getSlots(c, nodes.size() - slaveMasters.size()));
c++;
}
nodeConfig.add(sb.toString());
}
nodeConfig.add("vars currentEpoch 0 lastVoteEpoch 0");
return nodeConfig;
}
private static String getSlots(int index, int groupNum) {
final double t = 16383;
int start = index == 0 ? 0 : (int) (t / groupNum * index);
int end = index == groupNum - 1 ? (int) t : (int) (t / groupNum * (index + 1)) - 1;
return start + "-" + end;
}
private static String getRandomId() {
final SecureRandom r = new SecureRandom();
return new BigInteger(160, r).toString(16);
}
public static class ClusterProcesses {
private final BiHashMap<String, RedisRunner.RedisProcess> processes;
private ClusterProcesses(BiHashMap<String, RedisRunner.RedisProcess> processes) {
this.processes = processes;
}
public RedisRunner.RedisProcess getProcess(String nodeId) {
return processes.get(nodeId);
}
public String getNodeId(RedisRunner.RedisProcess process) {
return processes.reverseGet(process);
}
public Set<RedisRunner.RedisProcess> getNodes() {
return processes.valueSet();
}
public Set<String> getNodeIds() {
return processes.keySet();
}
public synchronized Map<String, Integer> shutdown() {
return processes
.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().stop()));
}
}
}

@ -0,0 +1,58 @@
package org.redisson;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedisVersion implements Comparable<RedisVersion>{
private final String fullVersion;
private final Integer majorVersion;
private final Integer minorVersion;
private final Integer patchVersion;
public RedisVersion(String fullVersion) {
this.fullVersion = fullVersion;
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion);
matcher.find();
majorVersion = Integer.parseInt(matcher.group(1));
minorVersion = Integer.parseInt(matcher.group(2));
patchVersion = Integer.parseInt(matcher.group(3));
}
public String getFullVersion() {
return fullVersion;
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
@Override
public int compareTo(RedisVersion o) {
int ma = this.majorVersion.compareTo(o.majorVersion);
int mi = this.minorVersion.compareTo(o.minorVersion);
int pa = this.patchVersion.compareTo(o.patchVersion);
return ma != 0 ? ma : mi != 0 ? mi : pa;
}
public int compareTo(String redisVersion) {
return this.compareTo(new RedisVersion(redisVersion));
}
public static int compareTo(String redisVersion1, String redisVersion2) {
return new RedisVersion(redisVersion1).compareTo(redisVersion2);
}
}

@ -0,0 +1,21 @@
package org.redisson;
import java.util.Locale;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.2.100\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;
static {
OS = System.getProperty("os.name", "generic");
isWindows = OS.toLowerCase(Locale.ENGLISH).contains("win");
}
}

@ -0,0 +1,16 @@
package org.redisson.spring.data.connection;
import org.junit.Before;
import org.redisson.BaseTest;
import org.springframework.data.redis.connection.RedisConnection;
public abstract class BaseConnectionTest extends BaseTest {
RedisConnection connection;
@Before
public void init() {
connection = new RedissonConnection(redisson);
}
}

@ -0,0 +1,26 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
public class RedissonConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
assertThat(connection.echo("test".getBytes())).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isTrue();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,45 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonMultiConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.exec().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.isQueueing()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat(connection.isQueueing()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,46 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonPipelineConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.closePipeline().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.isPipelined()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat(connection.isPipelined()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,132 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
public class RedissonSentinelConnectionTest {
RedissonClient redisson;
RedisSentinelConnection connection;
RedisRunner.RedisProcess master;
RedisRunner.RedisProcess slave1;
RedisRunner.RedisProcess slave2;
RedisRunner.RedisProcess sentinel1;
RedisRunner.RedisProcess sentinel2;
RedisRunner.RedisProcess sentinel3;
@Before
public void before() throws FailedToStartRedisException, IOException, InterruptedException {
master = new RedisRunner()
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = factory.getSentinelConnection();
}
@After
public void after() {
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
redisson.shutdown();
}
@Test
public void testMasters() {
Collection<RedisServer> masters = connection.masters();
assertThat(masters).hasSize(1);
}
@Test
public void testSlaves() {
Collection<RedisServer> masters = connection.masters();
Collection<RedisServer> slaves = connection.slaves(masters.iterator().next());
assertThat(slaves).hasSize(2);
}
@Test
public void testRemove() {
Collection<RedisServer> masters = connection.masters();
connection.remove(masters.iterator().next());
}
@Test
public void testMonitor() {
Collection<RedisServer> masters = connection.masters();
RedisServer master = masters.iterator().next();
master.setName(master.getName() + ":");
connection.monitor(master);
}
@Test
public void testFailover() throws InterruptedException {
Collection<RedisServer> masters = connection.masters();
connection.failover(masters.iterator().next());
Thread.sleep(10000);
RedisServer newMaster = connection.masters().iterator().next();
assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort());
}
}

@ -0,0 +1,54 @@
package org.redisson.spring.data.connection;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
connection.publish("test".getBytes(), "msg".getBytes());
}
@Test
public void testUnSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
}
}

@ -0,0 +1,76 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.12.1-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>redisson-spring-data-17</artifactId>
<packaging>jar</packaging>
<name>Redisson/Spring Data Redis v1.7.x integration</name>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.7.11.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<archive>
<manifestEntries>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Automatic-Module-Name>redisson.spring.data17</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>3.0</version>
<configuration>
<basedir>${basedir}</basedir>
<header>${basedir}/../../header.txt</header>
<quiet>false</quiet>
<failIfMissing>true</failIfMissing>
<aggregate>false</aggregate>
<includes>
<include>src/main/java/org/redisson/</include>
</includes>
<excludes>
<exclude>target/**</exclude>
</excludes>
<useDefaultExcludes>true</useDefaultExcludes>
<mapping>
<java>JAVADOC_STYLE</java>
</mapping>
<strictCheck>true</strictCheck>
<useDefaultMapping>true</useDefaultMapping>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {
if (obj instanceof String) {
return ((String) obj).getBytes(CharsetUtil.UTF_8);
}
return obj;
}
}

@ -0,0 +1,34 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
*
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {
String val = obj.toString();
return DataType.fromCode(val);
}
}

@ -0,0 +1,41 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
/**
*
* @author Nikita Koksharov
*
*/
public class DistanceConvertor implements Convertor<Distance> {
private final Metric metric;
public DistanceConvertor(Metric metric) {
super();
this.metric = metric;
}
@Override
public Distance convert(Object obj) {
return new Distance((Double)obj, metric);
}
}

@ -0,0 +1,60 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class ObjectListReplayDecoder2<T> implements MultiDecoder<List<T>> {
private final Decoder<Object> decoder;
public ObjectListReplayDecoder2() {
this(null);
}
public ObjectListReplayDecoder2(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public List<T> decode(List<Object> parts, State state) {
for (int i = 0; i < parts.size(); i++) {
Object object = parts.get(i);
if (object instanceof List) {
if (((List) object).isEmpty()) {
parts.set(i, null);
}
}
}
return (List<T>) parts;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
}

@ -0,0 +1,49 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.geo.Point;
/**
*
* @author Nikita Koksharov
*
*/
public class PointDecoder implements MultiDecoder<Point> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
@Override
public Point decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
Double longitude = (Double)parts.get(0);
Double latitude = (Double)parts.get(1);
return new Point(longitude, latitude);
}
}

@ -0,0 +1,46 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -0,0 +1,119 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.misc.RedisURI;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.Flag;
import org.springframework.data.redis.connection.RedisClusterNode.LinkState;
import org.springframework.data.redis.connection.RedisClusterNode.RedisClusterNodeBuilder;
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisClusterNodeDecoder implements Decoder<List<RedisClusterNode>> {
@Override
public List<RedisClusterNode> decode(ByteBuf buf, State state) throws IOException {
String response = buf.toString(CharsetUtil.UTF_8);
List<RedisClusterNode> nodes = new ArrayList<RedisClusterNode>();
for (String nodeInfo : response.split("\n")) {
String[] params = nodeInfo.split(" ");
String nodeId = params[0];
String flagsStr = params[2];
Set<Flag> flags = EnumSet.noneOf(Flag.class);
for (String flag : flagsStr.split(",")) {
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
flags.add(Flag.valueOf(flagValue));
}
RedisURI address = null;
if (!flags.contains(Flag.NOADDR)) {
String addr = params[1].split("@")[0];
address = new RedisURI("redis://" + addr);
}
String masterId = params[3];
if ("-".equals(masterId)) {
masterId = null;
}
Set<Integer> slotsCollection = new HashSet<Integer>();
LinkState linkState = null;
if (params.length >= 8 && params[7] != null) {
linkState = LinkState.valueOf(params[7].toUpperCase());
}
if (params.length > 8) {
for (int i = 0; i < params.length - 8; i++) {
String slots = params[i + 8];
if (slots.indexOf("-<-") != -1 || slots.indexOf("->-") != -1) {
continue;
}
String[] parts = slots.split("-");
if(parts.length == 1) {
slotsCollection.add(Integer.valueOf(parts[0]));
} else if(parts.length == 2) {
for (int j = Integer.valueOf(parts[0]); j < Integer.valueOf(parts[1]) + 1; j++) {
slotsCollection.add(j);
}
}
}
}
NodeType type = null;
if (flags.contains(Flag.MASTER)) {
type = NodeType.MASTER;
} else if (flags.contains(Flag.SLAVE)) {
type = NodeType.SLAVE;
}
RedisClusterNodeBuilder builder = RedisClusterNode.newRedisClusterNode()
.linkState(linkState)
.slaveOf(masterId)
.serving(new SlotRange(slotsCollection))
.withId(nodeId)
.promotedAs(type)
.withFlags(flags);
if (address != null) {
builder.listeningAt(address.getHost(), address.getPort());
}
nodes.add(builder.build());
}
return nodes;
}
}

@ -0,0 +1,380 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.connection.MasterSlaveEntry;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.connection.convert.StringToRedisClientInfoConverter;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection {
private static final RedisStrictCommand<List<RedisClusterNode>> CLUSTER_NODES =
new RedisStrictCommand<List<RedisClusterNode>>("CLUSTER", "NODES", new RedisClusterNodeDecoder());
public RedissonClusterConnection(RedissonClient redisson) {
super(redisson);
}
@Override
public Iterable<RedisClusterNode> clusterGetNodes() {
return read(null, StringCodec.INSTANCE, CLUSTER_NODES);
}
@Override
public Collection<RedisClusterNode> clusterGetSlaves(RedisClusterNode master) {
Iterable<RedisClusterNode> res = clusterGetNodes();
RedisClusterNode masterNode = null;
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (master.getHost().equals(redisClusterNode.getHost())
&& master.getPort().equals(redisClusterNode.getPort())) {
masterNode = redisClusterNode;
break;
}
}
if (masterNode == null) {
throw new IllegalStateException("Unable to find master node: " + master);
}
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (redisClusterNode.getMasterId() == null
|| !redisClusterNode.getMasterId().equals(masterNode.getId())) {
iterator.remove();
}
}
return (Collection<RedisClusterNode>) res;
}
@Override
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterSlaveMap() {
Iterable<RedisClusterNode> res = clusterGetNodes();
Set<RedisClusterNode> masters = new HashSet<RedisClusterNode>();
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
if (redisClusterNode.isMaster()) {
masters.add(redisClusterNode);
}
}
Map<RedisClusterNode, Collection<RedisClusterNode>> result = new HashMap<RedisClusterNode, Collection<RedisClusterNode>>();
for (Iterator<RedisClusterNode> iterator = res.iterator(); iterator.hasNext();) {
RedisClusterNode redisClusterNode = iterator.next();
for (RedisClusterNode masterNode : masters) {
if (redisClusterNode.getMasterId() != null
&& redisClusterNode.getMasterId().equals(masterNode.getId())) {
Collection<RedisClusterNode> list = result.get(masterNode);
if (list == null) {
list = new ArrayList<RedisClusterNode>();
result.put(masterNode, list);
}
list.add(redisClusterNode);
}
}
}
return result;
}
@Override
public Integer clusterGetSlotForKey(byte[] key) {
RFuture<Integer> f = executorService.readAsync((String)null, StringCodec.INSTANCE, RedisCommands.KEYSLOT, key);
return syncFuture(f);
}
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {
Iterable<RedisClusterNode> res = clusterGetNodes();
for (RedisClusterNode redisClusterNode : res) {
if (redisClusterNode.isMaster() && redisClusterNode.getSlotRange().contains(slot)) {
return redisClusterNode;
}
}
return null;
}
@Override
public RedisClusterNode clusterGetNodeForKey(byte[] key) {
int slot = executorService.getConnectionManager().calcSlot(key);
return clusterGetNodeForSlot(slot);
}
@Override
public ClusterInfo clusterGetClusterInfo() {
RFuture<Map<String, String>> f = executorService.readAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
syncFuture(f);
Properties props = new Properties();
for (Entry<String, String> entry : f.getNow().entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
return new ClusterInfo(props);
}
@Override
public void clusterAddSlots(RedisClusterNode node, int... slots) {
MasterSlaveEntry entry = getEntry(node);
List<Integer> params = convert(slots);
RFuture<Map<String, String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_ADDSLOTS, params.toArray());
syncFuture(f);
}
protected List<Integer> convert(int... slots) {
List<Integer> params = new ArrayList<Integer>();
for (int slot : slots) {
params.add(slot);
}
return params;
}
@Override
public void clusterAddSlots(RedisClusterNode node, SlotRange range) {
clusterAddSlots(node, range.getSlotsArray());
}
@Override
public Long clusterCountKeysInSlot(int slot) {
RedisClusterNode node = clusterGetNodeForSlot(slot);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(new InetSocketAddress(node.getHost(), node.getPort()));
RFuture<Long> f = executorService.readAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_COUNTKEYSINSLOT, slot);
return syncFuture(f);
}
@Override
public void clusterDeleteSlots(RedisClusterNode node, int... slots) {
MasterSlaveEntry entry = getEntry(node);
List<Integer> params = convert(slots);
RFuture<Long> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_DELSLOTS, params.toArray());
syncFuture(f);
}
@Override
public void clusterDeleteSlotsInRange(RedisClusterNode node, SlotRange range) {
clusterDeleteSlots(node, range.getSlotsArray());
}
@Override
public void clusterForget(RedisClusterNode node) {
RFuture<Void> f = executorService.writeAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_FORGET, node.getId());
syncFuture(f);
}
@Override
public void clusterMeet(RedisClusterNode node) {
Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command!");
Assert.hasText(node.getHost(), "Node to meet cluster must have a host!");
Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0!");
RFuture<Void> f = executorService.writeAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_MEET, node.getHost(), node.getPort());
syncFuture(f);
}
@Override
public void clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Map<String, String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_SETSLOT, slot, mode);
syncFuture(f);
}
private static final RedisStrictCommand<List<String>> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand<List<String>>("CLUSTER", "GETKEYSINSLOT", new ObjectListReplayDecoder<String>());
@Override
public List<byte[]> clusterGetKeysInSlot(int slot, Integer count) {
RFuture<List<byte[]>> f = executorService.readAsync((String)null, ByteArrayCodec.INSTANCE, CLUSTER_GETKEYSINSLOT, slot, count);
return syncFuture(f);
}
@Override
public void clusterReplicate(RedisClusterNode master, RedisClusterNode slave) {
MasterSlaveEntry entry = getEntry(master);
RFuture<Long> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CLUSTER_REPLICATE, slave.getId());
syncFuture(f);
}
@Override
public String ping(RedisClusterNode node) {
return execute(node, RedisCommands.PING);
}
@Override
public void bgReWriteAof(RedisClusterNode node) {
execute(node, RedisCommands.BGREWRITEAOF);
}
@Override
public void bgSave(RedisClusterNode node) {
execute(node, RedisCommands.BGSAVE);
}
@Override
public Long lastSave(RedisClusterNode node) {
return execute(node, RedisCommands.LASTSAVE);
}
@Override
public void save(RedisClusterNode node) {
execute(node, RedisCommands.SAVE);
}
@Override
public Long dbSize(RedisClusterNode node) {
return execute(node, RedisCommands.DBSIZE);
}
private <T> T execute(RedisClusterNode node, RedisCommand<T> command) {
MasterSlaveEntry entry = getEntry(node);
RFuture<T> f = executorService.writeAsync(entry, StringCodec.INSTANCE, command);
return syncFuture(f);
}
protected MasterSlaveEntry getEntry(RedisClusterNode node) {
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(new InetSocketAddress(node.getHost(), node.getPort()));
return entry;
}
@Override
public void flushDb(RedisClusterNode node) {
execute(node, RedisCommands.FLUSHDB);
}
@Override
public void flushAll(RedisClusterNode node) {
execute(node, RedisCommands.FLUSHALL);
}
@Override
public Properties info(RedisClusterNode node) {
Map<String, String> info = execute(node, RedisCommands.INFO_ALL);
Properties result = new Properties();
for (Entry<String, String> entry : info.entrySet()) {
result.setProperty(entry.getKey(), entry.getValue());
}
return result;
}
@Override
public Properties info(RedisClusterNode node, String section) {
RedisStrictCommand<Map<String, String>> command = new RedisStrictCommand<Map<String, String>>("INFO", section, new StringMapDataDecoder());
Map<String, String> info = execute(node, command);
Properties result = new Properties();
for (Entry<String, String> entry : info.entrySet()) {
result.setProperty(entry.getKey(), entry.getValue());
}
return result;
}
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
RFuture<Collection<String>> f = executorService.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = syncFuture(f);
Set<byte[]> result = new HashSet<byte[]>();
for (String key : keys) {
result.add(key.getBytes(CharsetUtil.UTF_8));
}
return result;
}
@Override
public byte[] randomKey(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<byte[]> f = executorService.readRandomAsync(entry, ByteArrayCodec.INSTANCE, RedisCommands.RANDOM_KEY);
return syncFuture(f);
}
@Override
public void shutdown(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.readAsync(entry, ByteArrayCodec.INSTANCE, RedisCommands.SHUTDOWN);
syncFuture(f);
}
@Override
public List<String> getConfig(RedisClusterNode node, String pattern) {
MasterSlaveEntry entry = getEntry(node);
RFuture<List<String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
return syncFuture(f);
}
@Override
public void setConfig(RedisClusterNode node, String param, String value) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
syncFuture(f);
}
@Override
public void resetConfigStats(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
syncFuture(f);
}
@Override
public Long time(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Long> f = executorService.readAsync(entry, LongCodec.INSTANCE, RedisCommands.TIME_LONG);
return syncFuture(f);
}
private static final StringToRedisClientInfoConverter CONVERTER = new StringToRedisClientInfoConverter();
@Override
public List<RedisClientInfo> getClientList(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<List<String>> f = executorService.readAsync(entry, StringCodec.INSTANCE, RedisCommands.CLIENT_LIST);
List<String> list = syncFuture(f);
return CONVERTER.convert(list.toArray(new String[list.size()]));
}
}

@ -0,0 +1,136 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.connection.SentinelConnectionManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;
/**
* Redisson based connection factory
*
* @author Nikita Koksharov
*
*/
public class RedissonConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean {
private final static Log log = LogFactory.getLog(RedissonConnectionFactory.class);
public static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
new PassThroughExceptionTranslationStrategy(new RedissonExceptionConverter());
private Config config;
private RedissonClient redisson;
/**
* Creates factory with default Redisson configuration
*/
public RedissonConnectionFactory() {
this(Redisson.create());
}
/**
* Creates factory with defined Redisson instance
*
* @param redisson - Redisson instance
*/
public RedissonConnectionFactory(RedissonClient redisson) {
this.redisson = redisson;
}
/**
* Creates factory with defined Redisson config
*
* @param config - Redisson config
*/
public RedissonConnectionFactory(Config config) {
super();
this.config = config;
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return EXCEPTION_TRANSLATION.translate(ex);
}
@Override
public void destroy() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
if (config != null) {
redisson = Redisson.create(config);
}
}
@Override
public RedisConnection getConnection() {
return new RedissonConnection(redisson);
}
@Override
public RedisClusterConnection getClusterConnection() {
if (!redisson.getConfig().isClusterConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Cluster mode");
}
return new RedissonClusterConnection(redisson);
}
@Override
public boolean getConvertPipelineAndTxResults() {
return true;
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
}

@ -0,0 +1,62 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.RedisTimeoutException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.ClusterRedirectException;
import org.springframework.data.redis.RedisConnectionFailureException;
/**
* Converts Redisson exceptions to Spring compatible
*
* @author Nikita Koksharov
*
*/
public class RedissonExceptionConverter implements Converter<Exception, DataAccessException> {
@Override
public DataAccessException convert(Exception source) {
if (source instanceof RedisConnectionException) {
return new RedisConnectionFailureException(source.getMessage(), source);
}
if (source instanceof RedisRedirectException) {
RedisRedirectException ex = (RedisRedirectException) source;
return new ClusterRedirectException(ex.getSlot(), ex.getUrl().getHost(), ex.getUrl().getPort(), source);
}
if (source instanceof RedisException) {
return new InvalidDataAccessApiUsageException(source.getMessage(), source);
}
if (source instanceof DataAccessException) {
return (DataAccessException) source;
}
if (source instanceof RedisTimeoutException) {
return new QueryTimeoutException(source.getMessage(), source);
}
return null;
}
}

@ -0,0 +1,91 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.springframework.data.redis.connection.NamedNode;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
import org.springframework.data.redis.connection.convert.Converters;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSentinelConnection implements RedisSentinelConnection {
private final RedisConnection connection;
public RedissonSentinelConnection(RedisConnection connection) {
this.connection = connection;
}
@Override
public void failover(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_FAILOVER, master.getName());
}
private static List<RedisServer> toRedisServersList(List<Map<String, String>> source) {
List<RedisServer> servers = new ArrayList<RedisServer>(source.size());
for (Map<String, String> info : source) {
servers.add(RedisServer.newServerFrom(Converters.toProperties(info)));
}
return servers;
}
@Override
public Collection<RedisServer> masters() {
List<Map<String, String>> masters = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_MASTERS);
return toRedisServersList(masters);
}
@Override
public Collection<RedisServer> slaves(NamedNode master) {
List<Map<String, String>> slaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, master.getName());
return toRedisServersList(slaves);
}
@Override
public void remove(NamedNode master) {
connection.sync(RedisCommands.SENTINEL_REMOVE, master.getName());
}
@Override
public void monitor(RedisServer master) {
connection.sync(RedisCommands.SENTINEL_MONITOR, master.getName(), master.getHost(),
master.getPort().intValue(), master.getQuorum().intValue());
}
@Override
public void close() throws IOException {
connection.closeAsync();
}
@Override
public boolean isOpen() {
return !connection.isClosed();
}
}

@ -0,0 +1,113 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.subscribeService = subscribeService;
}
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {
return;
}
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), (byte[])message);
getListener().onMessage(msg, null);
}
});
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
}
}
@Override
protected void doUnsubscribe(boolean all, byte[]... channels) {
for (byte[] channel : channels) {
subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
}
}
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), (byte[])message);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
}
}
@Override
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
for (byte[] pattern : patterns) {
subscribeService.unsubscribe(new ChannelName(pattern), PubSubType.PUNSUBSCRIBE);
}
}
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
}
}

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedListReplayDecoder implements MultiDecoder<List<Tuple>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public List<Tuple> decode(List<Object> parts, State state) {
List<Tuple> result = new ArrayList<Tuple>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue()));
}
return result;
}
}

@ -0,0 +1,53 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedSetReplayDecoder implements MultiDecoder<Set<Tuple>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public Set<Tuple> decode(List<Object> parts, State state) {
Set<Tuple> result = new LinkedHashSet<Tuple>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue()));
}
return result;
}
}

@ -0,0 +1,43 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;
public SecondsConvertor(TimeUnit unit, TimeUnit source) {
super();
this.unit = unit;
this.source = source;
}
@Override
public Long convert(Object obj) {
return unit.convert((Long)obj, source);
}
}

@ -0,0 +1,50 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
*/
public class SetReplayDecoder<T> implements MultiDecoder<Set<T>> {
private final Decoder<Object> decoder;
public SetReplayDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
@Override
public Set<T> decode(List<Object> parts, State state) {
return new LinkedHashSet(parts);
}
}

@ -0,0 +1,75 @@
package org.redisson;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public abstract class BaseTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
RedisRunner.startDefaultRedisServerInstance();
defaultRedisson = createInstance();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
defaultRedisson.shutdown();
try {
RedisRunner.shutDownDefaultRedisServerInstance();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@Before
public void before() throws IOException, InterruptedException {
if (redisson == null) {
redisson = defaultRedisson;
}
if (flushBetweenTests()) {
redisson.getKeys().flushall();
}
}
@After
public void after() throws InterruptedException {
}
public static Config createConfig() {
// String redisAddress = System.getProperty("redisAddress");
// if (redisAddress == null) {
// redisAddress = "127.0.0.1:6379";
// }
Config config = new Config();
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
// .setPassword("mypass1");
// config.useMasterSlaveConnection()
// .setMasterAddress("127.0.0.1:6379")
// .addSlaveAddress("127.0.0.1:6399")
// .addSlaveAddress("127.0.0.1:6389");
return config;
}
public static RedissonClient createInstance() {
Config config = createConfig();
return Redisson.create(config);
}
protected boolean flushBetweenTests() {
return true;
}
}

@ -0,0 +1,156 @@
package org.redisson;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.redisson.misc.BiHashMap;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class ClusterRunner {
private final LinkedHashMap<RedisRunner, String> nodes = new LinkedHashMap<>();
private final LinkedHashMap<String, String> slaveMasters = new LinkedHashMap<>();
public ClusterRunner addNode(RedisRunner runner) {
nodes.putIfAbsent(runner, getRandomId());
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_ENABLED)) {
runner.clusterEnabled(true);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT)) {
runner.clusterNodeTimeout(5000);
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.PORT)) {
runner.randomPort(1);
runner.port(RedisRunner.findFreePort());
}
if (!runner.hasOption(RedisRunner.REDIS_OPTIONS.BIND)) {
runner.bind("127.0.0.1");
}
return this;
}
public ClusterRunner addNode(RedisRunner master, RedisRunner... slaves) {
addNode(master);
for (RedisRunner slave : slaves) {
addNode(slave);
slaveMasters.put(nodes.get(slave), nodes.get(master));
}
return this;
}
public synchronized ClusterProcesses run() throws IOException, InterruptedException, RedisRunner.FailedToStartRedisException {
BiHashMap<String, RedisRunner.RedisProcess> processes = new BiHashMap<>();
for (RedisRunner runner : nodes.keySet()) {
List<String> options = getClusterConfig(runner);
String confFile = runner.dir() + File.separator + nodes.get(runner) + ".conf";
System.out.println("WRITING CONFIG: for " + nodes.get(runner));
try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) {
options.stream().forEach((line) -> {
printer.println(line);
System.out.println(line);
});
}
processes.put(nodes.get(runner), runner.clusterConfigFile(confFile).run());
}
Thread.sleep(1000);
for (RedisRunner.RedisProcess process : processes.valueSet()) {
if (!process.isAlive()) {
throw new RedisRunner.FailedToStartRedisException();
}
}
return new ClusterProcesses(processes);
}
private List<String> getClusterConfig(RedisRunner runner) {
String me = runner.getInitialBindAddr() + ":" + runner.getPort();
List<String> nodeConfig = new ArrayList<>();
int c = 0;
for (RedisRunner node : nodes.keySet()) {
String nodeId = nodes.get(node);
StringBuilder sb = new StringBuilder();
String nodeAddr = node.getInitialBindAddr() + ":" + node.getPort();
sb.append(nodeId).append(" ");
sb.append(nodeAddr).append(" ");
sb.append(me.equals(nodeAddr)
? "myself,"
: "");
boolean isMaster = !slaveMasters.containsKey(nodeId);
if (isMaster) {
sb.append("master -");
} else {
sb.append("slave ").append(slaveMasters.get(nodeId));
}
sb.append(" ");
sb.append("0").append(" ");
sb.append(me.equals(nodeAddr)
? "0"
: "1").append(" ");
sb.append(c + 1).append(" ");
sb.append("connected ");
if (isMaster) {
sb.append(getSlots(c, nodes.size() - slaveMasters.size()));
c++;
}
nodeConfig.add(sb.toString());
}
nodeConfig.add("vars currentEpoch 0 lastVoteEpoch 0");
return nodeConfig;
}
private static String getSlots(int index, int groupNum) {
final double t = 16383;
int start = index == 0 ? 0 : (int) (t / groupNum * index);
int end = index == groupNum - 1 ? (int) t : (int) (t / groupNum * (index + 1)) - 1;
return start + "-" + end;
}
private static String getRandomId() {
final SecureRandom r = new SecureRandom();
return new BigInteger(160, r).toString(16);
}
public static class ClusterProcesses {
private final BiHashMap<String, RedisRunner.RedisProcess> processes;
private ClusterProcesses(BiHashMap<String, RedisRunner.RedisProcess> processes) {
this.processes = processes;
}
public RedisRunner.RedisProcess getProcess(String nodeId) {
return processes.get(nodeId);
}
public String getNodeId(RedisRunner.RedisProcess process) {
return processes.reverseGet(process);
}
public Set<RedisRunner.RedisProcess> getNodes() {
return processes.valueSet();
}
public Set<String> getNodeIds() {
return processes.keySet();
}
public synchronized Map<String, Integer> shutdown() {
return processes
.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().stop()));
}
}
}

@ -0,0 +1,58 @@
package org.redisson;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedisVersion implements Comparable<RedisVersion>{
private final String fullVersion;
private final Integer majorVersion;
private final Integer minorVersion;
private final Integer patchVersion;
public RedisVersion(String fullVersion) {
this.fullVersion = fullVersion;
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion);
matcher.find();
majorVersion = Integer.parseInt(matcher.group(1));
minorVersion = Integer.parseInt(matcher.group(2));
patchVersion = Integer.parseInt(matcher.group(3));
}
public String getFullVersion() {
return fullVersion;
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
@Override
public int compareTo(RedisVersion o) {
int ma = this.majorVersion.compareTo(o.majorVersion);
int mi = this.minorVersion.compareTo(o.minorVersion);
int pa = this.patchVersion.compareTo(o.patchVersion);
return ma != 0 ? ma : mi != 0 ? mi : pa;
}
public int compareTo(String redisVersion) {
return this.compareTo(new RedisVersion(redisVersion));
}
public static int compareTo(String redisVersion1, String redisVersion2) {
return new RedisVersion(redisVersion1).compareTo(redisVersion2);
}
}

@ -0,0 +1,21 @@
package org.redisson;
import java.util.Locale;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonRuntimeEnvironment {
public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv"));
public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.2.100\\redis-server.exe");
public static final String tempDir = System.getProperty("java.io.tmpdir");
public static final String OS;
public static final boolean isWindows;
static {
OS = System.getProperty("os.name", "generic");
isWindows = OS.toLowerCase(Locale.ENGLISH).contains("win");
}
}

@ -0,0 +1,16 @@
package org.redisson.spring.data.connection;
import org.junit.Before;
import org.redisson.BaseTest;
import org.springframework.data.redis.connection.RedisConnection;
public abstract class BaseConnectionTest extends BaseTest {
RedisConnection connection;
@Before
public void init() {
connection = new RedissonConnection(redisson);
}
}

@ -0,0 +1,222 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.assertj.core.api.Assertions.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
public class RedissonClusterConnectionTest {
static RedissonClient redisson;
static RedissonClusterConnection connection;
static ClusterProcesses process;
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@AfterClass
public static void after() {
process.shutdown();
redisson.shutdown();
}
@Test
public void testClusterGetNodes() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
assertThat(nodes).hasSize(6);
for (RedisClusterNode redisClusterNode : nodes) {
assertThat(redisClusterNode.getLinkState()).isNotNull();
assertThat(redisClusterNode.getFlags()).isNotEmpty();
assertThat(redisClusterNode.getHost()).isNotNull();
assertThat(redisClusterNode.getPort()).isNotNull();
assertThat(redisClusterNode.getId()).isNotNull();
assertThat(redisClusterNode.getType()).isNotNull();
if (redisClusterNode.getType() == NodeType.MASTER) {
assertThat(redisClusterNode.getSlotRange().getSlots()).isNotEmpty();
} else {
assertThat(redisClusterNode.getMasterId()).isNotNull();
}
}
}
@Test
public void testClusterGetNodesMaster() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode redisClusterNode : nodes) {
if (redisClusterNode.getType() == NodeType.MASTER) {
Collection<RedisClusterNode> slaves = connection.clusterGetSlaves(redisClusterNode);
assertThat(slaves).hasSize(1);
}
}
}
@Test
public void testClusterGetMasterSlaveMap() {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterSlaveMap();
assertThat(map).hasSize(3);
for (Collection<RedisClusterNode> slaves : map.values()) {
assertThat(slaves).hasSize(1);
}
}
@Test
public void testClusterGetSlotForKey() {
Integer slot = connection.clusterGetSlotForKey("123".getBytes());
assertThat(slot).isNotNull();
}
@Test
public void testClusterGetNodeForSlot() {
RedisClusterNode node1 = connection.clusterGetNodeForSlot(1);
RedisClusterNode node2 = connection.clusterGetNodeForSlot(16000);
assertThat(node1.getId()).isNotEqualTo(node2.getId());
}
@Test
public void testClusterGetNodeForKey() {
RedisClusterNode node = connection.clusterGetNodeForKey("123".getBytes());
assertThat(node).isNotNull();
}
@Test
public void testClusterGetClusterInfo() {
ClusterInfo info = connection.clusterGetClusterInfo();
assertThat(info.getSlotsFail()).isEqualTo(0);
assertThat(info.getSlotsOk()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
assertThat(info.getSlotsAssigned()).isEqualTo(MasterSlaveConnectionManager.MAX_SLOT);
}
@Test
public void testClusterAddRemoveSlots() {
RedisClusterNode master = getFirstMaster();
Integer slot = master.getSlotRange().getSlots().iterator().next();
connection.clusterDeleteSlots(master, slot);
connection.clusterAddSlots(master, slot);
}
@Test
public void testClusterCountKeysInSlot() {
Long t = connection.clusterCountKeysInSlot(1);
assertThat(t).isZero();
}
@Test
public void testClusterMeetForget() {
RedisClusterNode master = getFirstMaster();
connection.clusterForget(master);
connection.clusterMeet(master);
}
@Test
public void testClusterGetKeysInSlot() {
List<byte[]> keys = connection.clusterGetKeysInSlot(12, 10);
assertThat(keys).isEmpty();
}
@Test
public void testClusterPing() {
RedisClusterNode master = getFirstMaster();
String res = connection.ping(master);
assertThat(res).isEqualTo("PONG");
}
@Test
public void testDbSize() {
RedisClusterNode master = getFirstMaster();
Long size = connection.dbSize(master);
assertThat(size).isZero();
}
@Test
public void testInfo() {
RedisClusterNode master = getFirstMaster();
Properties info = connection.info(master);
assertThat(info.size()).isGreaterThan(10);
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
connection.resetConfigStats(master);
}
@Test
public void testTime() {
RedisClusterNode master = getFirstMaster();
Long time = connection.time(master);
assertThat(time).isGreaterThan(1000);
}
@Test
public void testGetClientList() {
RedisClusterNode master = getFirstMaster();
List<RedisClientInfo> list = connection.getClientList(master);
assertThat(list.size()).isGreaterThan(10);
}
@Test
public void testSetConfig() {
RedisClusterNode master = getFirstMaster();
connection.setConfig(master, "timeout", "10");
}
@Test
public void testGetConfig() {
RedisClusterNode master = getFirstMaster();
List<String> config = connection.getConfig(master, "*");
assertThat(config.size()).isGreaterThan(20);
}
protected RedisClusterNode getFirstMaster() {
Map<RedisClusterNode, Collection<RedisClusterNode>> map = connection.clusterGetMasterSlaveMap();
RedisClusterNode master = map.keySet().iterator().next();
return master;
}
}

@ -0,0 +1,26 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
public class RedissonConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
assertThat(connection.echo("test".getBytes())).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isTrue();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,45 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonMultiConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.exec().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.isQueueing()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat(connection.isQueueing()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.multi();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.exec();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,46 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.junit.Test;
import org.redisson.BaseTest;
public class RedissonPipelineConnectionTest extends BaseConnectionTest {
@Test
public void testEcho() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.echo("test".getBytes())).isNull();
assertThat(connection.closePipeline().iterator().next()).isEqualTo("test".getBytes());
}
@Test
public void testSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.isPipelined()).isTrue();
connection.set("key".getBytes(), "value".getBytes());
assertThat(connection.get("key".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat(connection.isPipelined()).isFalse();
assertThat(result.get(0)).isEqualTo("value".getBytes());
}
@Test
public void testHSetGet() {
RedissonConnection connection = new RedissonConnection(redisson);
connection.openPipeline();
assertThat(connection.hSet("key".getBytes(), "field".getBytes(), "value".getBytes())).isNull();
assertThat(connection.hGet("key".getBytes(), "field".getBytes())).isNull();
List<Object> result = connection.closePipeline();
assertThat((Boolean)result.get(0)).isTrue();
assertThat(result.get(1)).isEqualTo("value".getBytes());
}
}

@ -0,0 +1,132 @@
package org.redisson.spring.data.connection;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
public class RedissonSentinelConnectionTest {
RedissonClient redisson;
RedisSentinelConnection connection;
RedisRunner.RedisProcess master;
RedisRunner.RedisProcess slave1;
RedisRunner.RedisProcess slave2;
RedisRunner.RedisProcess sentinel1;
RedisRunner.RedisProcess sentinel2;
RedisRunner.RedisProcess sentinel3;
@Before
public void before() throws FailedToStartRedisException, IOException, InterruptedException {
master = new RedisRunner()
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = factory.getSentinelConnection();
}
@After
public void after() {
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
redisson.shutdown();
}
@Test
public void testMasters() {
Collection<RedisServer> masters = connection.masters();
assertThat(masters).hasSize(1);
}
@Test
public void testSlaves() {
Collection<RedisServer> masters = connection.masters();
Collection<RedisServer> slaves = connection.slaves(masters.iterator().next());
assertThat(slaves).hasSize(2);
}
@Test
public void testRemove() {
Collection<RedisServer> masters = connection.masters();
connection.remove(masters.iterator().next());
}
@Test
public void testMonitor() {
Collection<RedisServer> masters = connection.masters();
RedisServer master = masters.iterator().next();
master.setName(master.getName() + ":");
connection.monitor(master);
}
@Test
public void testFailover() throws InterruptedException {
Collection<RedisServer> masters = connection.masters();
connection.failover(masters.iterator().next());
Thread.sleep(10000);
RedisServer newMaster = connection.masters().iterator().next();
assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort());
}
}

@ -0,0 +1,54 @@
package org.redisson.spring.data.connection;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
connection.publish("test".getBytes(), "msg".getBytes());
}
@Test
public void testUnSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
msg.set(message.getBody());
}
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
}
}
Loading…
Cancel
Save