Initial implementation of Spring Data Redis integration. #1373

pull/1547/head
Nikita 7 years ago
parent 589f29be94
commit d9ad010505

@ -267,6 +267,13 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.13.RELEASE</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session</artifactId>

@ -79,7 +79,6 @@ import org.redisson.cluster.ClusterNodeInfo;
public interface RedisCommands {
RedisStrictCommand<Long> GEOADD = new RedisStrictCommand<Long>("GEOADD");
RedisStrictCommand<Long> GEOADD_ENTRIES = new RedisStrictCommand<Long>("GEOADD");
RedisCommand<Double> GEODIST = new RedisCommand<Double>("GEODIST", new DoubleReplayConvertor());
RedisCommand<List<Object>> GEORADIUS = new RedisCommand<List<Object>>("GEORADIUS", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> GEORADIUSBYMEMBER = new RedisCommand<List<Object>>("GEORADIUSBYMEMBER", new ObjectListReplayDecoder<Object>());
@ -111,6 +110,7 @@ public interface RedisCommands {
RedisCommand<Boolean> ZADD_RAW = new RedisCommand<Boolean>("ZADD");
RedisStrictCommand<Integer> ZADD_INT = new RedisStrictCommand<Integer>("ZADD", new IntegerReplayConvertor());
RedisCommand<Long> ZADD = new RedisCommand<Long>("ZADD");
RedisStrictCommand<Long> ZREM_LONG = new RedisStrictCommand<Long>("ZREM");
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor());
RedisStrictCommand<Integer> ZCARD_INT = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> ZCARD = new RedisStrictCommand<Long>("ZCARD");
@ -118,7 +118,9 @@ public interface RedisCommands {
RedisStrictCommand<Integer> ZLEXCOUNT = new RedisStrictCommand<Integer>("ZLEXCOUNT", new IntegerReplayConvertor());
RedisCommand<Boolean> ZSCORE_CONTAINS = new RedisCommand<Boolean>("ZSCORE", new BooleanNotNullReplayConvertor());
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
RedisStrictCommand<Long> ZRANK = new RedisStrictCommand<Long>("ZRANK");
RedisCommand<Integer> ZRANK_INT = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor());
RedisStrictCommand<Long> ZREVRANK = new RedisStrictCommand<Long>("ZREVRANK");
RedisCommand<Integer> ZREVRANK_INT = new RedisCommand<Integer>("ZREVRANK", new IntegerReplayConvertor());
RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ListFirstObjectDecoder());
RedisStrictCommand<Double> ZRANGE_SINGLE_SCORE = new RedisStrictCommand<Double>("ZRANGE", new ObjectFirstScoreReplayDecoder());
@ -149,6 +151,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> UNWATCH = new RedisStrictCommand<Void>("UNWATCH", new VoidReplayConvertor());
RedisStrictCommand<Void> WATCH = new RedisStrictCommand<Void>("WATCH", new VoidReplayConvertor());
RedisStrictCommand<Void> MULTI = new RedisStrictCommand<Void>("MULTI", new VoidReplayConvertor());
RedisStrictCommand<Void> DISCARD = new RedisStrictCommand<Void>("DISCARD", new VoidReplayConvertor());
RedisCommand<List<Object>> EXEC = new RedisCommand<List<Object>>("EXEC", new ObjectListReplayDecoder<Object>());
RedisCommand<Boolean> SADD_BOOL = new RedisCommand<Boolean>("SADD", new BooleanAmountReplayConvertor());
@ -159,6 +162,7 @@ public interface RedisCommands {
RedisCommand<Boolean> SREM_SINGLE = new RedisCommand<Boolean>("SREM", new BooleanAmountReplayConvertor());
RedisCommand<Boolean> SMOVE = new RedisCommand<Boolean>("SMOVE", new BooleanReplayConvertor());
RedisCommand<Set<Object>> SMEMBERS = new RedisCommand<Set<Object>>("SMEMBERS", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<Object>> SRANDMEMBER = new RedisCommand<List<Object>>("SRANDMEMBER", new ObjectListReplayDecoder<Object>());
RedisCommand<Object> SRANDMEMBER_SINGLE = new RedisCommand<Object>("SRANDMEMBER");
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()));
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()));
@ -182,7 +186,6 @@ public interface RedisCommands {
RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX");
RedisCommand<Integer> LINSERT_INT = new RedisCommand<Integer>("LINSERT", new IntegerReplayConvertor());
RedisStrictCommand<Integer> LLEN_INT = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
RedisStrictCommand<Long> LLEN = new RedisStrictCommand<Long>("LLEN");
RedisStrictCommand<Void> LTRIM = new RedisStrictCommand<Void>("LTRIM", new VoidReplayConvertor());
RedisStrictCommand<Boolean> PEXPIRE = new RedisStrictCommand<Boolean>("PEXPIRE", new BooleanReplayConvertor());
@ -192,13 +195,16 @@ public interface RedisCommands {
RedisCommand<Object> RPOPLPUSH = new RedisCommand<Object>("RPOPLPUSH");
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<List<Object>> BLPOP = new RedisCommand<List<Object>>("BLPOP", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> BRPOP = new RedisCommand<List<Object>>("BRPOP", new ObjectListReplayDecoder<Object>());
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new ListObjectDecoder<Object>(1));
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new ListObjectDecoder<Object>(1));
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
Set<String> BLOCKING_COMMANDS = new HashSet<String>(
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName()));
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(),
BLPOP.getName(), BRPOP.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor());
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
@ -208,7 +214,7 @@ public interface RedisCommands {
RedisCommand<Set<Object>> SORT_SET = new RedisCommand<Set<Object>>("SORT", new ObjectSetReplayDecoder<Object>());
RedisCommand<Integer> SORT_TO = new RedisCommand<Integer>("SORT", new IntegerReplayConvertor());
RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP");
RedisCommand<Object> RPOP = new RedisCommand<Object>("RPOP");
RedisCommand<Integer> LPUSH = new RedisCommand<Integer>("LPUSH", new IntegerReplayConvertor());
RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
RedisStrictCommand<Void> LPUSH_VOID = new RedisStrictCommand<Void>("LPUSH", new VoidReplayConvertor());
@ -297,9 +303,6 @@ public interface RedisCommands {
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisStrictCommand<Double> GET_DOUBLE = new RedisStrictCommand<Double>("GET", new DoubleNullSafeReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET");
RedisCommand<Object> GETRANGE = new RedisCommand<Object>("GETRANGE");
RedisCommand<Object> APPEND = new RedisCommand<Object>("APPEND");
RedisCommand<Object> SETRANGE = new RedisCommand<Object>("SETRANGE");
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor());
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor());
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor());

@ -18,6 +18,7 @@ package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -61,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -672,6 +674,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return configEndpointHostName;
}
private int indexOf(byte[] array, byte element) {
for (int i = 0; i < array.length + 1; ++i) {
if (array[i] == element) {
return i;
}
}
return -1;
}
@Override
public int calcSlot(byte[] key) {
if (key == null) {
return 0;
}
int start = indexOf(key, (byte)'{');
if (start != -1) {
int end = indexOf(key, (byte)'}');
key = Arrays.copyOfRange(key, start+1, end);
}
int result = CRC16.crc16(key) % MAX_SLOT;
return result;
}
@Override
public int calcSlot(String key) {
if (key == null) {

@ -61,6 +61,8 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
@ -79,12 +81,18 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<R extends Collection<Object>, T> RFuture<R> readAllAsync(RedisCommand<T> command, R results, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);
<R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params);

@ -210,6 +210,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
@ -220,19 +227,25 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = createPromise();
List<R> results = new ArrayList<R>();
return readAllAsync(command, results, params);
}
@Override
public <R extends Collection<Object>, T> RFuture<R> readAllAsync(RedisCommand<T> command, final R results,
Object... params) {
final RPromise<R> mainPromise = createPromise();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<R> listener = new FutureListener<R>() {
FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) {
mainPromise.tryFailure(future.cause());
return;
}
R result = future.getNow();
Object result = future.getNow();
if (result instanceof Collection) {
synchronized (results) {
results.addAll((Collection) result);
@ -301,15 +314,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(false, command, callback, params);
return allAsync(false, connectionManager.getCodec(), command, callback, params);
}
@Override
public <R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(false, codec, command, callback, params);
}
@Override
public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(true, command, callback, params);
return allAsync(true, connectionManager.getCodec(), command, callback, params);
}
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, final RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, Codec codec, final RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
final RPromise<R> mainPromise = new RedissonPromise<R>();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size());
@ -342,7 +360,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null);
async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null);
}
return mainPromise;
}
@ -359,6 +377,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return new NodeSource(entry);
}
private NodeSource getNodeSource(byte[] key) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry);
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
@ -366,6 +390,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
async(true, source, codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
@ -475,6 +507,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect, final RFuture<RedisConnection> connFuture) {
@ -877,7 +916,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
((ScanResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
}
handleSuccess(details.getMainPromise(), details.getCommand(), res);
handleSuccess(details, details.getMainPromise(), details.getCommand(), res);
} else {
handleError(details, details.getMainPromise(), future.cause());
}
@ -893,7 +932,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.tryFailure(cause);
}
protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) {
protected <V, R> void handleSuccess(AsyncDetails<V, R> details, RPromise<R> promise, RedisCommand<?> command, R res) {
if (isRedissonReferenceSupportEnabled()) {
handleReference(promise, res);
} else {

@ -143,6 +143,10 @@ public class CommandBatchService extends CommandAsyncService {
super(connectionManager);
this.options = options;
}
public BatchOptions getOptions() {
return options;
}
public void add(RFuture<?> future, List<CommandBatchService> services) {
nestedServices.put(future, services);
@ -214,16 +218,28 @@ public class CommandBatchService extends CommandAsyncService {
}
@Override
protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) {
protected <V, R> void handleSuccess(final AsyncDetails<V, R> details, RPromise<R> promise, RedisCommand<?> command, R res) {
if (RedisCommands.EXEC.getName().equals(command.getName())) {
super.handleSuccess(promise, command, res);
super.handleSuccess(details, promise, command, res);
return;
}
if (RedisCommands.DISCARD.getName().equals(command.getName())) {
super.handleSuccess(details, promise, command, null);
if (executed.compareAndSet(false, true)) {
details.getConnectionFuture().getNow().forceFastReconnectAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
CommandBatchService.super.releaseConnection(details.getSource(), details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
}
return;
}
if (isRedisBasedQueue()) {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleSuccess(sentPromise, command, null);
super.handleSuccess(details, sentPromise, command, null);
semaphore.release();
}
}

@ -68,6 +68,8 @@ public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher();
int calcSlot(String key);
int calcSlot(byte[] key);
MasterSlaveServersConfig getConfig();

@ -471,6 +471,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return singleSlotRange.getStartSlot();
}
@Override
public int calcSlot(byte[] key) {
return singleSlotRange.getStartSlot();
}
@Override
public MasterSlaveEntry getEntry(InetSocketAddress address) {

@ -33,8 +33,11 @@ import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
*/
public class PubSubConnectionEntry {
private final AtomicInteger subscribedChannelsAmount;

@ -0,0 +1,34 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.springframework.data.redis.connection.DataType;
/**
*
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
@Override
public DataType convert(Object obj) {
String val = obj.toString();
return DataType.fromCode(val);
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
/**
*
* @author Nikita Koksharov
*
*/
public class DistanceConvertor extends SingleConvertor<Distance> {
private final Metric metric;
public DistanceConvertor(Metric metric) {
super();
this.metric = metric;
}
@Override
public Distance convert(Object obj) {
return new Distance((Double)obj, metric);
}
}

@ -0,0 +1,77 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
/**
*
* @author Nikita Koksharov
*
*/
public class GeoResultsDecoder implements MultiDecoder<GeoResults<GeoLocation<byte[]>>> {
private final Metric metric;
public GeoResultsDecoder() {
this(null);
}
public GeoResultsDecoder(Metric metric) {
super();
this.metric = metric;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return ListMultiDecoder.RESET;
}
@Override
public GeoResults<GeoLocation<byte[]>> decode(List<Object> parts, State state) {
List<GeoResult<GeoLocation<byte[]>>> result = new ArrayList<GeoResult<GeoLocation<byte[]>>>();
for (Object object : parts) {
if (object instanceof List) {
List<Object> vals = ((List<Object>) object);
if (metric != null) {
GeoLocation<byte[]> location = new GeoLocation<byte[]>((byte[])vals.get(0), null);
result.add(new GeoResult<GeoLocation<byte[]>>(location, new Distance((Double)vals.get(1), metric)));
} else {
GeoLocation<byte[]> location = new GeoLocation<byte[]>((byte[])vals.get(0), (Point)vals.get(1));
result.add(new GeoResult<GeoLocation<byte[]>>(location, null));
}
} else {
GeoLocation<byte[]> location = new GeoLocation<byte[]>((byte[])object, null);
result.add(new GeoResult<GeoLocation<byte[]>>(location, null));
}
}
return new GeoResults<GeoLocation<byte[]>>(result);
}
}

@ -0,0 +1,54 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
/**
*
* @author Nikita Koksharov
*
*/
public class GeoResultsDistanceDecoder implements MultiDecoder<GeoResults<GeoLocation<byte[]>>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return ListMultiDecoder.RESET;
}
@Override
public GeoResults<GeoLocation<byte[]>> decode(List<Object> parts, State state) {
List<GeoResult<GeoLocation<byte[]>>> result = new ArrayList<GeoResult<GeoLocation<byte[]>>>();
for (Object object : parts) {
List<Object> vals = ((List<Object>) object);
GeoLocation<byte[]> location = new GeoLocation<byte[]>((byte[])vals.get(0), (Point)vals.get(1));
result.add(new GeoResult<GeoLocation<byte[]>>(location, null));
}
return new GeoResults<GeoLocation<byte[]>>(result);
}
}

@ -0,0 +1,49 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.geo.Point;
/**
*
* @author Nikita Koksharov
*
*/
public class PointDecoder implements MultiDecoder<Point> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
@Override
public Point decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
Double longitude = (Double)parts.get(0);
Double latitude = (Double)parts.get(1);
return new Point(longitude, latitude);
}
}

@ -0,0 +1,252 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.SlotRange;
import org.springframework.data.redis.core.types.RedisClientInfo;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection {
public RedissonClusterConnection(RedissonClient redisson) {
super(redisson);
}
@Override
public Iterable<RedisClusterNode> clusterGetNodes() {
// TODO Auto-generated method stub
return null;
}
@Override
public Collection<RedisClusterNode> clusterGetSlaves(RedisClusterNode master) {
// TODO Auto-generated method stub
return null;
}
@Override
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterSlaveMap() {
// TODO Auto-generated method stub
return null;
}
@Override
public Integer clusterGetSlotForKey(byte[] key) {
// TODO Auto-generated method stub
return null;
}
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {
// TODO Auto-generated method stub
return null;
}
@Override
public RedisClusterNode clusterGetNodeForKey(byte[] key) {
// TODO Auto-generated method stub
return null;
}
@Override
public ClusterInfo clusterGetClusterInfo() {
// TODO Auto-generated method stub
return null;
}
@Override
public void clusterAddSlots(RedisClusterNode node, int... slots) {
// TODO Auto-generated method stub
}
@Override
public void clusterAddSlots(RedisClusterNode node, SlotRange range) {
// TODO Auto-generated method stub
}
@Override
public Long clusterCountKeysInSlot(int slot) {
// TODO Auto-generated method stub
return null;
}
@Override
public void clusterDeleteSlots(RedisClusterNode node, int... slots) {
// TODO Auto-generated method stub
}
@Override
public void clusterDeleteSlotsInRange(RedisClusterNode node, SlotRange range) {
// TODO Auto-generated method stub
}
@Override
public void clusterForget(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public void clusterMeet(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public void clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode) {
// TODO Auto-generated method stub
}
@Override
public List<byte[]> clusterGetKeysInSlot(int slot, Integer count) {
// TODO Auto-generated method stub
return null;
}
@Override
public void clusterReplicate(RedisClusterNode master, RedisClusterNode slave) {
// TODO Auto-generated method stub
}
@Override
public String ping(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public void bgReWriteAof(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public void bgSave(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public Long lastSave(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public void save(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public Long dbSize(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public void flushDb(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public void flushAll(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public Properties info(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public Properties info(RedisClusterNode node, String section) {
// TODO Auto-generated method stub
return null;
}
@Override
public Set<byte[]> keys(RedisClusterNode node, byte[] pattern) {
// TODO Auto-generated method stub
return null;
}
@Override
public byte[] randomKey(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public void shutdown(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public List<String> getConfig(RedisClusterNode node, String pattern) {
// TODO Auto-generated method stub
return null;
}
@Override
public void setConfig(RedisClusterNode node, String param, String value) {
// TODO Auto-generated method stub
}
@Override
public void resetConfigStats(RedisClusterNode node) {
// TODO Auto-generated method stub
}
@Override
public Long time(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
@Override
public List<RedisClientInfo> getClientList(RedisClusterNode node) {
// TODO Auto-generated method stub
return null;
}
}

@ -0,0 +1,93 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;
/**
* Redisson based connection factory
*
* @author Nikita Koksharov
*
*/
public class RedissonConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean {
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
new PassThroughExceptionTranslationStrategy(new RedissonExceptionConverter());
private Config config;
private RedissonClient redisson;
public RedissonConnectionFactory(RedissonClient redisson) {
this.redisson = redisson;
}
public RedissonConnectionFactory(Config config) {
super();
this.config = config;
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return EXCEPTION_TRANSLATION.translate(ex);
}
@Override
public void destroy() throws Exception {
redisson.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
if (config != null) {
redisson = Redisson.create(config);
}
}
@Override
public RedisConnection getConnection() {
return new RedissonConnection(redisson);
}
@Override
public RedisClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean getConvertPipelineAndTxResults() {
return true;
}
@Override
public RedisSentinelConnection getSentinelConnection() {
// TODO Auto-generated method stub
return null;
}
}

@ -0,0 +1,61 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.RedisTimeoutException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.ClusterRedirectException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.RedisSystemException;
/**
* Converts Redisson exceptions to Spring compatible
*
* @author Nikita Koksharov
*
*/
public class RedissonExceptionConverter implements Converter<Exception, DataAccessException> {
@Override
public DataAccessException convert(Exception source) {
if (source instanceof RedisConnectionException) {
return new RedisConnectionFailureException(source.getMessage(), source);
}
if (source instanceof RedisRedirectException) {
RedisRedirectException ex = (RedisRedirectException) source;
return new ClusterRedirectException(ex.getSlot(), ex.getUrl().getHost(), ex.getUrl().getPort(), source);
}
if (source instanceof RedisException) {
return new RedisSystemException(source.getMessage(), source);
}
if (source instanceof DataAccessException) {
return (DataAccessException) source;
}
if (source instanceof RedisTimeoutException) {
return new QueryTimeoutException(source.getMessage(), source);
}
return null;
}
}

@ -0,0 +1,53 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedSetReplayDecoder implements MultiDecoder<Set<Tuple>> {
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public Set<Tuple> decode(List<Object> parts, State state) {
Set<Tuple> result = new LinkedHashSet<Tuple>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue()));
}
return result;
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
private final TimeUnit unit;
public SecondsConvertor(TimeUnit unit) {
super();
this.unit = unit;
}
@Override
public Long convert(Object obj) {
return unit.convert((Long)obj, TimeUnit.SECONDS);
}
}

@ -0,0 +1,50 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
*/
public class SetReplayDecoder<T> implements MultiDecoder<Set<T>> {
private final Decoder<Object> decoder;
public SetReplayDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
@Override
public Set<T> decode(List<Object> parts, State state) {
return new LinkedHashSet(parts);
}
}
Loading…
Cancel
Save