diff --git a/redisson/pom.xml b/redisson/pom.xml index c4c214925..b9436a987 100644 --- a/redisson/pom.xml +++ b/redisson/pom.xml @@ -267,6 +267,13 @@ provided true + + org.springframework.data + spring-data-redis + 1.8.13.RELEASE + provided + true + org.springframework.session spring-session diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 6a3c0bf1d..82025b986 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -79,7 +79,6 @@ import org.redisson.cluster.ClusterNodeInfo; public interface RedisCommands { RedisStrictCommand GEOADD = new RedisStrictCommand("GEOADD"); - RedisStrictCommand GEOADD_ENTRIES = new RedisStrictCommand("GEOADD"); RedisCommand GEODIST = new RedisCommand("GEODIST", new DoubleReplayConvertor()); RedisCommand> GEORADIUS = new RedisCommand>("GEORADIUS", new ObjectListReplayDecoder()); RedisCommand> GEORADIUSBYMEMBER = new RedisCommand>("GEORADIUSBYMEMBER", new ObjectListReplayDecoder()); @@ -111,6 +110,7 @@ public interface RedisCommands { RedisCommand ZADD_RAW = new RedisCommand("ZADD"); RedisStrictCommand ZADD_INT = new RedisStrictCommand("ZADD", new IntegerReplayConvertor()); RedisCommand ZADD = new RedisCommand("ZADD"); + RedisStrictCommand ZREM_LONG = new RedisStrictCommand("ZREM"); RedisCommand ZREM = new RedisCommand("ZREM", new BooleanAmountReplayConvertor()); RedisStrictCommand ZCARD_INT = new RedisStrictCommand("ZCARD", new IntegerReplayConvertor()); RedisStrictCommand ZCARD = new RedisStrictCommand("ZCARD"); @@ -118,7 +118,9 @@ public interface RedisCommands { RedisStrictCommand ZLEXCOUNT = new RedisStrictCommand("ZLEXCOUNT", new IntegerReplayConvertor()); RedisCommand ZSCORE_CONTAINS = new RedisCommand("ZSCORE", new BooleanNotNullReplayConvertor()); RedisStrictCommand ZSCORE = new RedisStrictCommand("ZSCORE", new DoubleReplayConvertor()); + RedisStrictCommand ZRANK = new RedisStrictCommand("ZRANK"); RedisCommand ZRANK_INT = new RedisCommand("ZRANK", new IntegerReplayConvertor()); + RedisStrictCommand ZREVRANK = new RedisStrictCommand("ZREVRANK"); RedisCommand ZREVRANK_INT = new RedisCommand("ZREVRANK", new IntegerReplayConvertor()); RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ListFirstObjectDecoder()); RedisStrictCommand ZRANGE_SINGLE_SCORE = new RedisStrictCommand("ZRANGE", new ObjectFirstScoreReplayDecoder()); @@ -149,6 +151,7 @@ public interface RedisCommands { RedisStrictCommand UNWATCH = new RedisStrictCommand("UNWATCH", new VoidReplayConvertor()); RedisStrictCommand WATCH = new RedisStrictCommand("WATCH", new VoidReplayConvertor()); RedisStrictCommand MULTI = new RedisStrictCommand("MULTI", new VoidReplayConvertor()); + RedisStrictCommand DISCARD = new RedisStrictCommand("DISCARD", new VoidReplayConvertor()); RedisCommand> EXEC = new RedisCommand>("EXEC", new ObjectListReplayDecoder()); RedisCommand SADD_BOOL = new RedisCommand("SADD", new BooleanAmountReplayConvertor()); @@ -159,6 +162,7 @@ public interface RedisCommands { RedisCommand SREM_SINGLE = new RedisCommand("SREM", new BooleanAmountReplayConvertor()); RedisCommand SMOVE = new RedisCommand("SMOVE", new BooleanReplayConvertor()); RedisCommand> SMEMBERS = new RedisCommand>("SMEMBERS", new ObjectSetReplayDecoder()); + RedisCommand> SRANDMEMBER = new RedisCommand>("SRANDMEMBER", new ObjectListReplayDecoder()); RedisCommand SRANDMEMBER_SINGLE = new RedisCommand("SRANDMEMBER"); RedisCommand> SSCAN = new RedisCommand>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); @@ -182,7 +186,6 @@ public interface RedisCommands { RedisCommand LINDEX = new RedisCommand("LINDEX"); RedisCommand LINSERT_INT = new RedisCommand("LINSERT", new IntegerReplayConvertor()); RedisStrictCommand LLEN_INT = new RedisStrictCommand("LLEN", new IntegerReplayConvertor()); - RedisStrictCommand LLEN = new RedisStrictCommand("LLEN"); RedisStrictCommand LTRIM = new RedisStrictCommand("LTRIM", new VoidReplayConvertor()); RedisStrictCommand PEXPIRE = new RedisStrictCommand("PEXPIRE", new BooleanReplayConvertor()); @@ -192,13 +195,16 @@ public interface RedisCommands { RedisCommand RPOPLPUSH = new RedisCommand("RPOPLPUSH"); RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); + RedisCommand> BLPOP = new RedisCommand>("BLPOP", new ObjectListReplayDecoder()); + RedisCommand> BRPOP = new RedisCommand>("BRPOP", new ObjectListReplayDecoder()); RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new ListObjectDecoder(1)); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new ListObjectDecoder(1)); RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); Set BLOCKING_COMMANDS = new HashSet( - Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName())); + Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(), + BLPOP.getName(), BRPOP.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); @@ -208,7 +214,7 @@ public interface RedisCommands { RedisCommand> SORT_SET = new RedisCommand>("SORT", new ObjectSetReplayDecoder()); RedisCommand SORT_TO = new RedisCommand("SORT", new IntegerReplayConvertor()); - RedisStrictCommand RPOP = new RedisStrictCommand("RPOP"); + RedisCommand RPOP = new RedisCommand("RPOP"); RedisCommand LPUSH = new RedisCommand("LPUSH", new IntegerReplayConvertor()); RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor()); RedisStrictCommand LPUSH_VOID = new RedisStrictCommand("LPUSH", new VoidReplayConvertor()); @@ -297,9 +303,6 @@ public interface RedisCommands { RedisStrictCommand GET_INTEGER = new RedisStrictCommand("GET", new IntegerReplayConvertor()); RedisStrictCommand GET_DOUBLE = new RedisStrictCommand("GET", new DoubleNullSafeReplayConvertor()); RedisCommand GETSET = new RedisCommand("GETSET"); - RedisCommand GETRANGE = new RedisCommand("GETRANGE"); - RedisCommand APPEND = new RedisCommand("APPEND"); - RedisCommand SETRANGE = new RedisCommand("SETRANGE"); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor()); RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor()); RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 083407c32..196c5ada6 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -18,6 +18,7 @@ package org.redisson.cluster; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -61,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.resolver.AddressResolver; +import io.netty.util.CharsetUtil; import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -672,6 +674,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return configEndpointHostName; } + private int indexOf(byte[] array, byte element) { + for (int i = 0; i < array.length + 1; ++i) { + if (array[i] == element) { + return i; + } + } + return -1; + } + + @Override + public int calcSlot(byte[] key) { + if (key == null) { + return 0; + } + + int start = indexOf(key, (byte)'{'); + if (start != -1) { + int end = indexOf(key, (byte)'}'); + key = Arrays.copyOfRange(key, start+1, end); + } + + int result = CRC16.crc16(key) % MAX_SLOT; + return result; + } + @Override public int calcSlot(String key) { if (key == null) { diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 9121575f9..7cf0831fc 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -61,6 +61,8 @@ public interface CommandAsyncExecutor { RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object ... params); + RFuture readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object ... params); RFuture evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); @@ -79,12 +81,18 @@ public interface CommandAsyncExecutor { RFuture evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + RFuture readAsync(byte[] key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(String key, Codec codec, RedisCommand command, Object ... params); RFuture writeAsync(String key, Codec codec, RedisCommand command, Object ... params); + , T> RFuture readAllAsync(RedisCommand command, R results, Object... params); + RFuture> readAllAsync(RedisCommand command, Object ... params); + RFuture writeAllAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... params); + RFuture writeAllAsync(RedisCommand command, Object ... params); RFuture writeAsync(String key, RedisCommand command, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 975a8282f..3ae838c41 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -210,6 +210,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); return mainPromise; } + + public RFuture readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand command, Object... params) { + RPromise mainPromise = createPromise(); + int slot = connectionManager.calcSlot(key); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); + return mainPromise; + } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { @@ -220,19 +227,25 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { - final RPromise> mainPromise = createPromise(); + List results = new ArrayList(); + return readAllAsync(command, results, params); + } + + @Override + public , T> RFuture readAllAsync(RedisCommand command, final R results, + Object... params) { + final RPromise mainPromise = createPromise(); final Collection nodes = connectionManager.getEntrySet(); - final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); - FutureListener listener = new FutureListener() { + FutureListener listener = new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } - R result = future.getNow(); + Object result = future.getNow(); if (result instanceof Collection) { synchronized (results) { results.addAll((Collection) result); @@ -301,15 +314,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture writeAllAsync(RedisCommand command, SlotCallback callback, Object... params) { - return allAsync(false, command, callback, params); + return allAsync(false, connectionManager.getCodec(), command, callback, params); } + @Override + public RFuture writeAllAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... params) { + return allAsync(false, codec, command, callback, params); + } + @Override public RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object... params) { - return allAsync(true, command, callback, params); + return allAsync(true, connectionManager.getCodec(), command, callback, params); } - private RFuture allAsync(boolean readOnlyMode, final RedisCommand command, final SlotCallback callback, Object... params) { + private RFuture allAsync(boolean readOnlyMode, Codec codec, final RedisCommand command, final SlotCallback callback, Object... params) { final RPromise mainPromise = new RedissonPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -342,7 +360,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null); + async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null); } return mainPromise; } @@ -359,6 +377,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { return new NodeSource(entry); } + private NodeSource getNodeSource(byte[] key) { + int slot = connectionManager.calcSlot(key); + MasterSlaveEntry entry = connectionManager.getEntry(slot); + return new NodeSource(entry); + } + @Override public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); @@ -366,6 +390,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { async(true, source, codec, command, params, mainPromise, 0, false, null); return mainPromise; } + + @Override + public RFuture readAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { + RPromise mainPromise = createPromise(); + NodeSource source = getNodeSource(key); + async(true, source, codec, command, params, mainPromise, 0, false, null); + return mainPromise; + } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); @@ -475,6 +507,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } + public RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { + RPromise mainPromise = createPromise(); + NodeSource source = getNodeSource(key); + async(false, source, codec, command, params, mainPromise, 0, false, null); + return mainPromise; + } + protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, final boolean ignoreRedirect, final RFuture connFuture) { @@ -877,7 +916,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { ((ScanResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient()); } - handleSuccess(details.getMainPromise(), details.getCommand(), res); + handleSuccess(details, details.getMainPromise(), details.getCommand(), res); } else { handleError(details, details.getMainPromise(), future.cause()); } @@ -893,7 +932,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { mainPromise.tryFailure(cause); } - protected void handleSuccess(RPromise promise, RedisCommand command, R res) { + protected void handleSuccess(AsyncDetails details, RPromise promise, RedisCommand command, R res) { if (isRedissonReferenceSupportEnabled()) { handleReference(promise, res); } else { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 8bbfdf195..267fece6f 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -143,6 +143,10 @@ public class CommandBatchService extends CommandAsyncService { super(connectionManager); this.options = options; } + + public BatchOptions getOptions() { + return options; + } public void add(RFuture future, List services) { nestedServices.put(future, services); @@ -214,16 +218,28 @@ public class CommandBatchService extends CommandAsyncService { } @Override - protected void handleSuccess(RPromise promise, RedisCommand command, R res) { + protected void handleSuccess(final AsyncDetails details, RPromise promise, RedisCommand command, R res) { if (RedisCommands.EXEC.getName().equals(command.getName())) { - super.handleSuccess(promise, command, res); + super.handleSuccess(details, promise, command, res); + return; + } + if (RedisCommands.DISCARD.getName().equals(command.getName())) { + super.handleSuccess(details, promise, command, null); + if (executed.compareAndSet(false, true)) { + details.getConnectionFuture().getNow().forceFastReconnectAsync().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + CommandBatchService.super.releaseConnection(details.getSource(), details.getConnectionFuture(), details.isReadOnlyMode(), details.getAttemptPromise(), details); + } + }); + } return; } if (isRedisBasedQueue()) { BatchPromise batchPromise = (BatchPromise) promise; RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); - super.handleSuccess(sentPromise, command, null); + super.handleSuccess(details, sentPromise, command, null); semaphore.release(); } } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 21e5e47fc..b2a24ffb1 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -68,6 +68,8 @@ public interface ConnectionManager { IdleConnectionWatcher getConnectionWatcher(); int calcSlot(String key); + + int calcSlot(byte[] key); MasterSlaveServersConfig getConfig(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 5c9d88d6b..17a8e6bac 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -471,6 +471,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return singleSlotRange.getStartSlot(); } + @Override + public int calcSlot(byte[] key) { + return singleSlotRange.getStartSlot(); + } @Override public MasterSlaveEntry getEntry(InetSocketAddress address) { diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 71c1a2d8b..b698f690d 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -33,8 +33,11 @@ import org.redisson.client.SubscribeListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubType; -import io.netty.util.concurrent.Future; - +/** + * + * @author Nikita Koksharov + * + */ public class PubSubConnectionEntry { private final AtomicInteger subscribedChannelsAmount; diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java b/redisson/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java new file mode 100644 index 000000000..536c43006 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java @@ -0,0 +1,34 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import org.redisson.client.protocol.convertor.SingleConvertor; +import org.springframework.data.redis.connection.DataType; + +/** + * + * @author Nikita Koksharov + * + */ +public class DataTypeConvertor extends SingleConvertor { + + @Override + public DataType convert(Object obj) { + String val = obj.toString(); + return DataType.fromCode(val); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java b/redisson/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java new file mode 100644 index 000000000..c44ce44d8 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java @@ -0,0 +1,41 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import org.redisson.client.protocol.convertor.SingleConvertor; +import org.springframework.data.geo.Distance; +import org.springframework.data.geo.Metric; + +/** + * + * @author Nikita Koksharov + * + */ +public class DistanceConvertor extends SingleConvertor { + + private final Metric metric; + + public DistanceConvertor(Metric metric) { + super(); + this.metric = metric; + } + + @Override + public Distance convert(Object obj) { + return new Distance((Double)obj, metric); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDecoder.java b/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDecoder.java new file mode 100644 index 000000000..2783ce516 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDecoder.java @@ -0,0 +1,77 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.ArrayList; +import java.util.List; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.springframework.data.geo.Distance; +import org.springframework.data.geo.GeoResult; +import org.springframework.data.geo.GeoResults; +import org.springframework.data.geo.Metric; +import org.springframework.data.geo.Point; +import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation; + +/** + * + * @author Nikita Koksharov + * + */ +public class GeoResultsDecoder implements MultiDecoder>> { + + private final Metric metric; + + public GeoResultsDecoder() { + this(null); + } + + public GeoResultsDecoder(Metric metric) { + super(); + this.metric = metric; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + return ListMultiDecoder.RESET; + } + + @Override + public GeoResults> decode(List parts, State state) { + List>> result = new ArrayList>>(); + for (Object object : parts) { + if (object instanceof List) { + List vals = ((List) object); + + if (metric != null) { + GeoLocation location = new GeoLocation((byte[])vals.get(0), null); + result.add(new GeoResult>(location, new Distance((Double)vals.get(1), metric))); + } else { + GeoLocation location = new GeoLocation((byte[])vals.get(0), (Point)vals.get(1)); + result.add(new GeoResult>(location, null)); + } + } else { + GeoLocation location = new GeoLocation((byte[])object, null); + result.add(new GeoResult>(location, null)); + } + } + return new GeoResults>(result); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDistanceDecoder.java b/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDistanceDecoder.java new file mode 100644 index 000000000..257c2da4a --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/GeoResultsDistanceDecoder.java @@ -0,0 +1,54 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.ArrayList; +import java.util.List; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.springframework.data.geo.GeoResult; +import org.springframework.data.geo.GeoResults; +import org.springframework.data.geo.Point; +import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation; + +/** + * + * @author Nikita Koksharov + * + */ +public class GeoResultsDistanceDecoder implements MultiDecoder>> { + + @Override + public Decoder getDecoder(int paramNum, State state) { + return ListMultiDecoder.RESET; + } + + @Override + public GeoResults> decode(List parts, State state) { + List>> result = new ArrayList>>(); + for (Object object : parts) { + List vals = ((List) object); + + GeoLocation location = new GeoLocation((byte[])vals.get(0), (Point)vals.get(1)); + result.add(new GeoResult>(location, null)); + } + return new GeoResults>(result); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/PointDecoder.java b/redisson/src/main/java/org/redisson/spring/data/connection/PointDecoder.java new file mode 100644 index 000000000..72f70a4fa --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/PointDecoder.java @@ -0,0 +1,49 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.List; + +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.springframework.data.geo.Point; + +/** + * + * @author Nikita Koksharov + * + */ +public class PointDecoder implements MultiDecoder { + + @Override + public Decoder getDecoder(int paramNum, State state) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + + @Override + public Point decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + + Double longitude = (Double)parts.get(0); + Double latitude = (Double)parts.get(1); + return new Point(longitude, latitude); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java new file mode 100644 index 000000000..65018f820 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonClusterConnection.java @@ -0,0 +1,252 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.redisson.api.RedissonClient; +import org.springframework.data.redis.connection.ClusterInfo; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.connection.RedisClusterNode.SlotRange; +import org.springframework.data.redis.core.types.RedisClientInfo; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonClusterConnection extends RedissonConnection implements RedisClusterConnection { + + public RedissonClusterConnection(RedissonClient redisson) { + super(redisson); + } + + @Override + public Iterable clusterGetNodes() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Collection clusterGetSlaves(RedisClusterNode master) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map> clusterGetMasterSlaveMap() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Integer clusterGetSlotForKey(byte[] key) { + // TODO Auto-generated method stub + return null; + } + + @Override + public RedisClusterNode clusterGetNodeForSlot(int slot) { + // TODO Auto-generated method stub + return null; + } + + @Override + public RedisClusterNode clusterGetNodeForKey(byte[] key) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ClusterInfo clusterGetClusterInfo() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void clusterAddSlots(RedisClusterNode node, int... slots) { + // TODO Auto-generated method stub + + } + + @Override + public void clusterAddSlots(RedisClusterNode node, SlotRange range) { + // TODO Auto-generated method stub + + } + + @Override + public Long clusterCountKeysInSlot(int slot) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void clusterDeleteSlots(RedisClusterNode node, int... slots) { + // TODO Auto-generated method stub + + } + + @Override + public void clusterDeleteSlotsInRange(RedisClusterNode node, SlotRange range) { + // TODO Auto-generated method stub + + } + + @Override + public void clusterForget(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public void clusterMeet(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public void clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode) { + // TODO Auto-generated method stub + + } + + @Override + public List clusterGetKeysInSlot(int slot, Integer count) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void clusterReplicate(RedisClusterNode master, RedisClusterNode slave) { + // TODO Auto-generated method stub + + } + + @Override + public String ping(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void bgReWriteAof(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public void bgSave(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public Long lastSave(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void save(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public Long dbSize(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void flushDb(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public void flushAll(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public Properties info(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Properties info(RedisClusterNode node, String section) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Set keys(RedisClusterNode node, byte[] pattern) { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] randomKey(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void shutdown(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public List getConfig(RedisClusterNode node, String pattern) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setConfig(RedisClusterNode node, String param, String value) { + // TODO Auto-generated method stub + + } + + @Override + public void resetConfigStats(RedisClusterNode node) { + // TODO Auto-generated method stub + + } + + @Override + public Long time(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getClientList(RedisClusterNode node) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java new file mode 100644 index 000000000..cf08f364c --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -0,0 +1,1917 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import static org.redisson.client.protocol.RedisCommands.LRANGE; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.Redisson; +import org.redisson.SlotCallback; +import org.redisson.api.BatchOptions; +import org.redisson.api.BatchOptions.ExecutionMode; +import org.redisson.api.BatchResult; +import org.redisson.api.RFuture; +import org.redisson.api.RedissonClient; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisException; +import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.Time; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.CodecDecoder; +import org.redisson.client.protocol.decoder.GeoDistanceDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.LongMultiDecoder; +import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; +import org.redisson.command.CommandAsyncService; +import org.redisson.command.CommandBatchService; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.geo.Circle; +import org.springframework.data.geo.Distance; +import org.springframework.data.geo.GeoResults; +import org.springframework.data.geo.Metric; +import org.springframework.data.geo.Point; +import org.springframework.data.redis.connection.AbstractRedisConnection; +import org.springframework.data.redis.connection.DataType; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisNode; +import org.springframework.data.redis.connection.RedisPipelineException; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.connection.SortParameters; +import org.springframework.data.redis.connection.Subscription; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.KeyBoundCursor; +import org.springframework.data.redis.core.ScanCursor; +import org.springframework.data.redis.core.ScanIteration; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.types.Expiration; +import org.springframework.data.redis.core.types.RedisClientInfo; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * Redisson connection + * + * @author Nikita Koksharov + * + */ +public class RedissonConnection extends AbstractRedisConnection { + + private boolean closed; + private final Redisson redisson; + + private CommandAsyncService executorService; + + public RedissonConnection(RedissonClient redisson) { + super(); + this.redisson = (Redisson) redisson; + executorService = (CommandAsyncService) this.redisson.getCommandExecutor(); + } + + @Override + public void close() throws DataAccessException { + super.close(); + + closed = true; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public Object getNativeConnection() { + return redisson; + } + + @Override + public boolean isQueueing() { + if (executorService instanceof CommandBatchService) { + CommandBatchService es = (CommandBatchService) executorService; + return es.getOptions().getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC; + } + return false; + } + + @Override + public boolean isPipelined() { + if (executorService instanceof CommandBatchService) { + CommandBatchService es = (CommandBatchService) executorService; + return es.getOptions().getExecutionMode() == ExecutionMode.IN_MEMORY; + } + return false; + } + + @Override + public void openPipeline() { + BatchOptions options = BatchOptions.defaults() + .executionMode(ExecutionMode.IN_MEMORY); + this.executorService = new CommandBatchService(redisson.getConnectionManager(), options); + } + + @Override + public List closePipeline() throws RedisPipelineException { + if (isPipelined()) { + CommandBatchService es = (CommandBatchService) executorService; + BatchResult result = es.execute(); + return (List) result.getResponses(); + } else { + throw new InvalidDataAccessApiUsageException("Not in pipeline mode. Please invoke multi method"); + } + } + + @Override + public Object execute(String command, byte[]... args) { + return write(null, ByteArrayCodec.INSTANCE, new RedisCommand(command), args); + } + + private V get(RFuture future) { + return executorService.get(future); + } + + @Override + public Boolean exists(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.EXISTS, key); + } + + private void checkExecution(final RPromise result, final AtomicReference failed, + final AtomicLong count, final AtomicLong executed) { + if (executed.decrementAndGet() == 0) { + if (failed.get() != null) { + if (count.get() > 0) { + RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); + result.tryFailure(ex); + } else { + result.tryFailure(failed.get()); + } + } else { + result.trySuccess(count.get()); + } + } + } + + private RFuture executeAsync(RedisStrictCommand command, byte[] ... keys) { + if (!executorService.getConnectionManager().isClusterMode()) { + return executorService.writeAsync(null, command, keys); + } + + Map> range2key = new HashMap>(); + for (byte[] key : keys) { + int slot = executorService.getConnectionManager().calcSlot(key); + MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot); + List list = range2key.get(entry); + if (list == null) { + list = new ArrayList(); + range2key.put(entry, list); + } + list.add(key); + } + + final RPromise result = new RedissonPromise(); + final AtomicReference failed = new AtomicReference(); + final AtomicLong count = new AtomicLong(); + final AtomicLong executed = new AtomicLong(range2key.size()); + FutureListener> listener = new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (future.isSuccess()) { + List result = (List) future.get(); + for (Long res : result) { + if (res != null) { + count.addAndGet(res); + } + } + } else { + failed.set(future.cause()); + } + + checkExecution(result, failed, count, executed); + } + }; + + for (Entry> entry : range2key.entrySet()) { + CommandBatchService es = new CommandBatchService(executorService.getConnectionManager()); + for (byte[] key : entry.getValue()) { + es.writeAsync(entry.getKey(), null, command, key); + } + + RFuture> future = es.executeAsync(); + future.addListener(listener); + } + + return result; + } + + @Override + public Long del(byte[]... keys) { + return get(executeAsync(RedisCommands.DEL, keys)); + } + + RedisStrictCommand TYPE = new RedisStrictCommand("TYPE", new DataTypeConvertor()); + + @Override + public DataType type(byte[] key) { + RFuture f = executorService.readAsync(key, StringCodec.INSTANCE, TYPE, key); + return get(f); + } + + private final RedisStrictCommand> KEYS = new RedisStrictCommand>("KEYS", new SetReplayDecoder(ByteArrayCodec.INSTANCE.getValueDecoder())); + + @Override + public Set keys(byte[] pattern) { + Set results = new HashSet(); + RFuture> f = (RFuture>)(Object)(executorService.readAllAsync(KEYS, results, pattern)); + return get(f); + } + + @Override + public Cursor scan(ScanOptions options) { + return new ScanCursor(0, options) { + + private RedisClient client; + private Iterator entries; + private MasterSlaveEntry entry; + + { + entries = redisson.getConnectionManager().getEntrySet().iterator(); + entry = entries.next(); + } + + @Override + protected ScanIteration doScan(long cursorId, ScanOptions options) { + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode."); + } + + if (entry == null) { + return null; + } + + List args = new ArrayList(); + // to avoid negative value + cursorId = Math.max(cursorId, 0); + args.add(cursorId); + if (options.getPattern() != null) { + args.add("MATCH"); + args.add(options.getPattern()); + } + if (options.getCount() != null) { + args.add("COUNT"); + args.add(options.getCount()); + } + + RFuture> f = executorService.readAsync(client, entry, ByteArrayCodec.INSTANCE, RedisCommands.SCAN, args.toArray()); + ListScanResult res = get(f); + long pos = res.getPos(); + client = res.getRedisClient(); + if (pos == 0) { + if (entries.hasNext()) { + pos = -1; + entry = entries.next(); + } else { + entry = null; + } + } + + return new ScanIteration(pos, res.getValues()); + } + }.open(); + } + + @Override + public byte[] randomKey() { + RFuture f = executorService.readRandomAsync(RedisCommands.RANDOM_KEY); + return get(f); + } + + @Override + public void rename(byte[] oldName, byte[] newName) { + write(oldName, StringCodec.INSTANCE, RedisCommands.RENAME, oldName, newName); + } + + @Override + public Boolean renameNX(byte[] oldName, byte[] newName) { + return write(oldName, StringCodec.INSTANCE, RedisCommands.RENAMENX, oldName, newName); + } + + RedisStrictCommand EXPIRE = new RedisStrictCommand("EXPIRE", new BooleanReplayConvertor()); + + @Override + public Boolean expire(byte[] key, long seconds) { + return write(key, StringCodec.INSTANCE, EXPIRE, key, seconds); + } + + @Override + public Boolean pExpire(byte[] key, long millis) { + return write(key, StringCodec.INSTANCE, RedisCommands.PEXPIRE, key, millis); + } + + RedisStrictCommand EXPIREAT = new RedisStrictCommand("EXPIREAT", new BooleanReplayConvertor()); + + @Override + public Boolean expireAt(byte[] key, long unixTime) { + return write(key, StringCodec.INSTANCE, EXPIREAT, key, unixTime); + } + + @Override + public Boolean pExpireAt(byte[] key, long unixTimeInMillis) { + return write(key, StringCodec.INSTANCE, RedisCommands.PEXPIREAT, key, unixTimeInMillis); + } + + @Override + public Boolean persist(byte[] key) { + return write(key, StringCodec.INSTANCE, RedisCommands.PERSIST, key); + } + + @Override + public Boolean move(byte[] key, int dbIndex) { + return write(key, StringCodec.INSTANCE, RedisCommands.MOVE, key, dbIndex); + } + + private static final RedisStrictCommand TTL = new RedisStrictCommand("TTL"); + + @Override + public Long ttl(byte[] key) { + return read(key, StringCodec.INSTANCE, TTL, key); + } + + @Override + public Long ttl(byte[] key, TimeUnit timeUnit) { + return read(key, StringCodec.INSTANCE, new RedisStrictCommand("TTL", new SecondsConvertor(timeUnit)), key); + } + + @Override + public Long pTtl(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.PTTL, key); + } + + @Override + public Long pTtl(byte[] key, TimeUnit timeUnit) { + return read(key, StringCodec.INSTANCE, new RedisStrictCommand("PTTL", new SecondsConvertor(timeUnit)), key); + } + + @Override + public List sort(byte[] key, SortParameters sortParams) { + List params = new ArrayList(); + params.add(key); + if (sortParams.getByPattern() != null) { + params.add("BY"); + params.add(sortParams.getByPattern()); + } + + if (sortParams.getLimit() != null) { + params.add("LIMIT"); + if (sortParams.getLimit().getStart() != -1) { + params.add(sortParams.getLimit().getStart()); + } + if (sortParams.getLimit().getCount() != -1) { + params.add(sortParams.getLimit().getCount()); + } + } + + if (sortParams.getGetPattern() != null) { + for (byte[] pattern : sortParams.getGetPattern()) { + params.add("GET"); + params.add(pattern); + } + } + + if (sortParams.getOrder() != null) { + params.add(sortParams.getOrder()); + } + + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.SORT_LIST, params.toArray()); + } + + private static final RedisCommand SORT_TO = new RedisCommand("SORT"); + + @Override + public Long sort(byte[] key, SortParameters sortParams, byte[] storeKey) { + List params = new ArrayList(); + params.add(key); + if (sortParams.getByPattern() != null) { + params.add("BY"); + params.add(sortParams.getByPattern()); + } + + if (sortParams.getLimit() != null) { + params.add("LIMIT"); + if (sortParams.getLimit().getStart() != -1) { + params.add(sortParams.getLimit().getStart()); + } + if (sortParams.getLimit().getCount() != -1) { + params.add(sortParams.getLimit().getCount()); + } + } + + if (sortParams.getGetPattern() != null) { + for (byte[] pattern : sortParams.getGetPattern()) { + params.add("GET"); + params.add(pattern); + } + } + + if (sortParams.getOrder() != null) { + params.add(sortParams.getOrder()); + } + + params.add("STORE"); + params.add(storeKey); + + return read(key, ByteArrayCodec.INSTANCE, SORT_TO, params.toArray()); + } + + @Override + public byte[] dump(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.DUMP, key); + } + + @Override + public void restore(byte[] key, long ttlInMillis, byte[] serializedValue) { + write(key, StringCodec.INSTANCE, RedisCommands.RESTORE, key, ttlInMillis, serializedValue); + } + + @Override + public byte[] get(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key); + } + + @Override + public byte[] getSet(byte[] key, byte[] value) { + return write(key, ByteArrayCodec.INSTANCE, RedisCommands.GETSET, key, value); + } + + private static final RedisCommand> MGET = new RedisCommand>("MGET", new ObjectListReplayDecoder()); + + @Override + public List mGet(byte[]... keys) { + return write(keys[0], ByteArrayCodec.INSTANCE, MGET, (Object[])keys); + } + + @Override + public void set(byte[] key, byte[] value) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value); + } + + @Override + public void set(byte[] key, byte[] value, Expiration expiration, SetOption option) { + if (expiration.isPersistent()) { + if (option == SetOption.UPSERT) { + set(key, value); + } + if (option == SetOption.SET_IF_ABSENT) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value, "NX"); + } + if (option == SetOption.SET_IF_ABSENT) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value, "XX"); + } + } else { + if (option == SetOption.UPSERT) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value, "PX", expiration.getExpirationTimeInMilliseconds()); + } + if (option == SetOption.SET_IF_ABSENT) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value, "PX", expiration.getExpirationTimeInMilliseconds(), "NX"); + } + if (option == SetOption.SET_IF_ABSENT) { + write(key, StringCodec.INSTANCE, RedisCommands.SET, key, value, "PX", expiration.getExpirationTimeInMilliseconds(), "XX"); + } + } + } + + @Override + public Boolean setNX(byte[] key, byte[] value) { + return write(key, StringCodec.INSTANCE, RedisCommands.SETNX, key, value); + } + + RedisCommand SETEX = new RedisCommand("SETEX", new VoidReplayConvertor()); + + @Override + public void setEx(byte[] key, long seconds, byte[] value) { + write(key, StringCodec.INSTANCE, RedisCommands.PSETEX, key, value); + } + + @Override + public void pSetEx(byte[] key, long milliseconds, byte[] value) { + write(key, StringCodec.INSTANCE, RedisCommands.PSETEX, key, milliseconds, value); + } + + @Override + public void mSet(Map tuple) { + List params = convert(tuple); + write(tuple.keySet().iterator().next(), StringCodec.INSTANCE, RedisCommands.MSET, params.toArray()); + } + + protected List convert(Map tuple) { + List params = new ArrayList(tuple.size()*2); + for (Entry entry : tuple.entrySet()) { + params.add(entry.getKey()); + params.add(entry.getValue()); + } + return params; + } + + @Override + public Boolean mSetNX(Map tuple) { + List params = convert(tuple); + return write(tuple.keySet().iterator().next(), StringCodec.INSTANCE, RedisCommands.MSETNX, params.toArray()); + } + + @Override + public Long incr(byte[] key) { + return write(key, StringCodec.INSTANCE, RedisCommands.INCR, key); + } + + @Override + public Long incrBy(byte[] key, long value) { + return write(key, StringCodec.INSTANCE, RedisCommands.INCRBY, key, value); + } + + @Override + public Double incrBy(byte[] key, double value) { + return write(key, StringCodec.INSTANCE, RedisCommands.INCRBYFLOAT, key, BigDecimal.valueOf(value).toPlainString()); + } + + @Override + public Long decr(byte[] key) { + return write(key, StringCodec.INSTANCE, RedisCommands.DECR, key); + } + + private static final RedisStrictCommand DECRBY = new RedisStrictCommand("DECRBY"); + + @Override + public Long decrBy(byte[] key, long value) { + return write(key, StringCodec.INSTANCE, DECRBY, key, value); + } + + private static final RedisStrictCommand APPEND = new RedisStrictCommand("APPEND"); + + @Override + public Long append(byte[] key, byte[] value) { + return write(key, StringCodec.INSTANCE, APPEND, key, value); + } + + private static final RedisCommand GETRANGE = new RedisCommand("GETRANGE"); + + @Override + public byte[] getRange(byte[] key, long begin, long end) { + return read(key, ByteArrayCodec.INSTANCE, GETRANGE, key, begin, end); + } + + private static final RedisCommand SETRANGE = new RedisCommand("SETRANGE", new VoidReplayConvertor()); + + @Override + public void setRange(byte[] key, byte[] value, long offset) { + write(key, ByteArrayCodec.INSTANCE, SETRANGE, key, value, offset); + } + + @Override + public Boolean getBit(byte[] key, long offset) { + return read(key, StringCodec.INSTANCE, RedisCommands.GETBIT, key, offset); + } + + @Override + public Boolean setBit(byte[] key, long offset, boolean value) { + return write(key, StringCodec.INSTANCE, RedisCommands.SETBIT, key, offset, value); + } + + @Override + public Long bitCount(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.BITCOUNT, key); + } + + @Override + public Long bitCount(byte[] key, long begin, long end) { + return read(key, StringCodec.INSTANCE, RedisCommands.BITCOUNT, key, begin, end); + } + + @Override + public Long bitOp(BitOperation op, byte[] destination, byte[]... keys) { + List params = new ArrayList(keys.length + 2); + params.add(op); + params.add(destination); + params.addAll(Arrays.asList(keys)); + return write(keys[0], StringCodec.INSTANCE, RedisCommands.BITOP, params.toArray()); + } + + @Override + public Long strLen(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.STRLEN, key); + } + + private static final RedisStrictCommand RPUSH = new RedisStrictCommand("RPUSH"); + + @Override + public Long rPush(byte[] key, byte[]... values) { + List args = new ArrayList(values.length + 1); + args.add(key); + args.addAll(Arrays.asList(values)); + return write(key, StringCodec.INSTANCE, RPUSH, args.toArray()); + } + + private static final RedisStrictCommand LPUSH = new RedisStrictCommand("LPUSH"); + + @Override + public Long lPush(byte[] key, byte[]... values) { + List args = new ArrayList(values.length + 1); + args.add(key); + args.addAll(Arrays.asList(values)); + return write(key, StringCodec.INSTANCE, LPUSH, args.toArray()); + } + + private static final RedisStrictCommand RPUSHX = new RedisStrictCommand("RPUSHX"); + + @Override + public Long rPushX(byte[] key, byte[] value) { + return write(key, StringCodec.INSTANCE, RPUSHX, key, value); + } + + private static final RedisStrictCommand LPUSHX = new RedisStrictCommand("LPUSHX"); + + @Override + public Long lPushX(byte[] key, byte[] value) { + return write(key, StringCodec.INSTANCE, LPUSHX, key, value); + } + + private static final RedisStrictCommand LLEN = new RedisStrictCommand("LLEN"); + + @Override + public Long lLen(byte[] key) { + return read(key, StringCodec.INSTANCE, LLEN, key); + } + + @Override + public List lRange(byte[] key, long start, long end) { + return read(key, ByteArrayCodec.INSTANCE, LRANGE, key, start, end); + } + + @Override + public void lTrim(byte[] key, long start, long end) { + write(key, StringCodec.INSTANCE, RedisCommands.LTRIM, key, start, end); + } + + @Override + public byte[] lIndex(byte[] key, long index) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.LINDEX, key, index); + } + + private static final RedisStrictCommand LINSERT = new RedisStrictCommand("LINSERT"); + + @Override + public Long lInsert(byte[] key, Position where, byte[] pivot, byte[] value) { + return write(key, StringCodec.INSTANCE, LINSERT, key, where, pivot, value); + } + + private T write(byte[] key, Codec codec, RedisCommand command, Object... params) { + RFuture f = executorService.writeAsync(key, codec, command, params); + return get(f); + } + + private T read(byte[] key, Codec codec, RedisCommand command, Object... params) { + RFuture f = executorService.readAsync(key, codec, command, params); + return get(f); + } + + @Override + public void lSet(byte[] key, long index, byte[] value) { + write(key, StringCodec.INSTANCE, RedisCommands.LSET, key, index, value); + } + + private static final RedisStrictCommand LREM = new RedisStrictCommand("LREM"); + + @Override + public Long lRem(byte[] key, long count, byte[] value) { + return write(key, StringCodec.INSTANCE, LREM, key, count, value); + } + + @Override + public byte[] lPop(byte[] key) { + return write(key, ByteArrayCodec.INSTANCE, RedisCommands.LPOP, key); + } + + @Override + public byte[] rPop(byte[] key) { + return write(key, ByteArrayCodec.INSTANCE, RedisCommands.RPOP, key); + } + + @Override + public List bLPop(int timeout, byte[]... keys) { + List params = new ArrayList(keys.length + 1); + params.addAll(Arrays.asList(keys)); + params.add(timeout); + return write(keys[0], ByteArrayCodec.INSTANCE, RedisCommands.BLPOP, params.toArray()); + } + + @Override + public List bRPop(int timeout, byte[]... keys) { + List params = new ArrayList(keys.length + 1); + params.addAll(Arrays.asList(keys)); + params.add(timeout); + return write(keys[0], ByteArrayCodec.INSTANCE, RedisCommands.BRPOP, params.toArray()); + } + + @Override + public byte[] rPopLPush(byte[] srcKey, byte[] dstKey) { + return write(srcKey, ByteArrayCodec.INSTANCE, RedisCommands.RPOPLPUSH, srcKey, dstKey); + } + + @Override + public byte[] bRPopLPush(int timeout, byte[] srcKey, byte[] dstKey) { + return write(srcKey, ByteArrayCodec.INSTANCE, RedisCommands.BRPOPLPUSH, srcKey, dstKey, timeout); + } + + private static final RedisCommand SADD = new RedisCommand("SADD"); + + @Override + public Long sAdd(byte[] key, byte[]... values) { + List args = new ArrayList(values.length + 1); + args.add(key); + args.addAll(Arrays.asList(values)); + + return write(key, StringCodec.INSTANCE, SADD, args.toArray()); + } + + private static final RedisStrictCommand SREM = new RedisStrictCommand("SREM"); + + @Override + public Long sRem(byte[] key, byte[]... values) { + List args = new ArrayList(values.length + 1); + args.add(key); + args.addAll(Arrays.asList(values)); + + return write(key, StringCodec.INSTANCE, SREM, args.toArray()); + } + + @Override + public byte[] sPop(byte[] key) { + return write(key, ByteArrayCodec.INSTANCE, RedisCommands.SPOP, key); + } + + @Override + public Boolean sMove(byte[] srcKey, byte[] destKey, byte[] value) { + return write(srcKey, StringCodec.INSTANCE, RedisCommands.SMOVE, srcKey, destKey, value); + } + + private static final RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); + + @Override + public Long sCard(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.SCARD_INT, key); + } + + @Override + public Boolean sIsMember(byte[] key, byte[] value) { + return read(key, StringCodec.INSTANCE, RedisCommands.SISMEMBER, key, value); + } + + @Override + public Set sInter(byte[]... keys) { + return write(keys[0], ByteArrayCodec.INSTANCE, RedisCommands.SINTER, keys); + } + + @Override + public Long sInterStore(byte[] destKey, byte[]... keys) { + List args = new ArrayList(keys.length + 1); + args.add(destKey); + args.addAll(Arrays.asList(keys)); + return write(keys[0], StringCodec.INSTANCE, RedisCommands.SINTERSTORE, args.toArray()); + } + + @Override + public Set sUnion(byte[]... keys) { + return write(keys[0], ByteArrayCodec.INSTANCE, RedisCommands.SUNION, keys); + } + + @Override + public Long sUnionStore(byte[] destKey, byte[]... keys) { + List args = new ArrayList(keys.length + 1); + args.add(destKey); + args.addAll(Arrays.asList(keys)); + return write(keys[0], StringCodec.INSTANCE, RedisCommands.SUNIONSTORE, args.toArray()); + } + + @Override + public Set sDiff(byte[]... keys) { + return write(keys[0], ByteArrayCodec.INSTANCE, RedisCommands.SDIFF, keys); + } + + @Override + public Long sDiffStore(byte[] destKey, byte[]... keys) { + List args = new ArrayList(keys.length + 1); + args.add(destKey); + args.addAll(Arrays.asList(keys)); + return write(keys[0], StringCodec.INSTANCE, RedisCommands.SDIFFSTORE, args.toArray()); + } + + @Override + public Set sMembers(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.SMEMBERS, key); + } + + @Override + public byte[] sRandMember(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.SRANDMEMBER_SINGLE, key); + } + + @Override + public List sRandMember(byte[] key, long count) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.SRANDMEMBER, key); + } + + @Override + public Cursor sScan(byte[] key, ScanOptions options) { + return new KeyBoundCursor(key, 0, options) { + + private RedisClient client; + + @Override + protected ScanIteration doScan(byte[] key, long cursorId, ScanOptions options) { + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode."); + } + + List args = new ArrayList(); + args.add(key); + args.add(cursorId); + if (options.getPattern() != null) { + args.add("MATCH"); + args.add(options.getPattern()); + } + if (options.getCount() != null) { + args.add("COUNT"); + args.add(options.getCount()); + } + + RFuture> f = executorService.readAsync(client, key, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN, args.toArray()); + ListScanResult res = get(f); + client = res.getRedisClient(); + return new ScanIteration(res.getPos(), res.getValues()); + } + }.open(); + } + + @Override + public Boolean zAdd(byte[] key, double score, byte[] value) { + return write(key, StringCodec.INSTANCE, RedisCommands.ZADD_BOOL, key, BigDecimal.valueOf(score).toPlainString(), value); + } + + @Override + public Long zAdd(byte[] key, Set tuples) { + List params = new ArrayList(tuples.size()*2+1); + params.add(key); + for (Tuple entry : tuples) { + params.add(BigDecimal.valueOf(entry.getScore()).toPlainString()); + params.add(entry.getValue()); + } + return write(key, StringCodec.INSTANCE, RedisCommands.ZADD, params.toArray()); + } + + @Override + public Long zRem(byte[] key, byte[]... values) { + List params = new ArrayList(values.length+1); + params.add(key); + params.addAll(Arrays.asList(values)); + + return write(key, StringCodec.INSTANCE, RedisCommands.ZREM_LONG, params.toArray()); + } + + @Override + public Double zIncrBy(byte[] key, double increment, byte[] value) { + return write(key, DoubleCodec.INSTANCE, RedisCommands.ZINCRBY, + key, new BigDecimal(value.toString()).toPlainString(), value); + } + + @Override + public Long zRank(byte[] key, byte[] value) { + return read(key, StringCodec.INSTANCE, RedisCommands.ZRANK, key, value); + } + + @Override + public Long zRevRank(byte[] key, byte[] value) { + return read(key, StringCodec.INSTANCE, RedisCommands.ZREVRANK, key, value); + } + + private static final RedisCommand> ZRANGE = new RedisCommand>("ZRANGE", new ObjectSetReplayDecoder()); + + @Override + public Set zRange(byte[] key, long start, long end) { + return read(key, ByteArrayCodec.INSTANCE, ZRANGE, key, start, end); + } + + private static final RedisCommand> ZRANGE_ENTRY = new RedisCommand>("ZRANGE", new ScoredSortedSetReplayDecoder()); + + @Override + public Set zRangeWithScores(byte[] key, long start, long end) { + return read(key, ByteArrayCodec.INSTANCE, ZRANGE_ENTRY, key, start, end, "WITHSCORES"); + } + + private String value(Object score, boolean inclusive, String defaultValue) { + if (score == null) { + return defaultValue; + } + StringBuilder element = new StringBuilder(); + if (!inclusive) { + element.append("("); + } + if (score instanceof Double && Double.isInfinite((Double) score)) { + element.append((Double)score > 0 ? "+inf" : "-inf"); + } else { + element.append(BigDecimal.valueOf((Double)score).toPlainString()); + } + return element.toString(); + } + + @Override + public Set zRangeByScore(byte[] key, double min, double max) { + return zRangeByScore(key, new Range().gte(min).lte(max)); + } + + @Override + public Set zRangeByScoreWithScores(byte[] key, Range range) { + return zRangeByScoreWithScores(key, range, null); + } + + @Override + public Set zRangeByScoreWithScores(byte[] key, double min, double max) { + return zRangeByScoreWithScores(key, new Range().gte(min).lte(max)); + } + + @Override + public Set zRangeByScore(byte[] key, double min, double max, long offset, long count) { + return zRangeByScore(key, new Range().gte(min).lte(max), + new Limit().offset(Long.valueOf(offset).intValue()).count(Long.valueOf(count).intValue())); + } + + @Override + public Set zRangeByScoreWithScores(byte[] key, double min, double max, long offset, long count) { + return zRangeByScoreWithScores(key, new Range().gte(min).lte(max), + new Limit().offset(Long.valueOf(offset).intValue()).count(Long.valueOf(count).intValue())); + } + + @Override + public Set zRangeByScoreWithScores(byte[] key, Range range, Limit limit) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + + List args = new ArrayList(); + args.add(key); + args.add(min); + args.add(max); + args.add("WITHSCORES"); + + if (limit != null) { + args.add("LIMIT"); + args.add(limit.getOffset()); + args.add(limit.getCount()); + } + + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.ZRANGEBYSCORE, args.toArray()); + } + + private static final RedisCommand> ZREVRANGE = new RedisCommand>("ZREVRANGE", new ObjectSetReplayDecoder()); + + @Override + public Set zRevRange(byte[] key, long start, long end) { + return read(key, ByteArrayCodec.INSTANCE, ZREVRANGE, key, start, end); + } + + private static final RedisCommand> ZREVRANGE_ENTRY = new RedisCommand>("ZRANGE", new ScoredSortedSetReplayDecoder()); + + @Override + public Set zRevRangeWithScores(byte[] key, long start, long end) { + return read(key, ByteArrayCodec.INSTANCE, ZREVRANGE_ENTRY, key, start, end, "WITHSCORES"); + } + + @Override + public Set zRevRangeByScore(byte[] key, double min, double max) { + return zRevRangeByScore(key, new Range().gte(min).lte(max)); + } + + private static final RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectSetReplayDecoder()); + + @Override + public Set zRevRangeByScore(byte[] key, Range range) { + return zRevRangeByScore(key, range, null); + } + + @Override + public Set zRevRangeByScoreWithScores(byte[] key, double min, double max) { + return zRevRangeByScoreWithScores(key, new Range().gte(min).lte(max)); + } + + @Override + public Set zRevRangeByScore(byte[] key, double min, double max, long offset, long count) { + return zRevRangeByScore(key, new Range().gte(min).lte(max), + new Limit().offset(Long.valueOf(offset).intValue()).count(Long.valueOf(count).intValue())); + } + + @Override + public Set zRevRangeByScore(byte[] key, Range range, Limit limit) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + + List args = new ArrayList(); + args.add(key); + args.add(min); + args.add(max); + + if (limit != null) { + args.add("LIMIT"); + args.add(limit.getOffset()); + args.add(limit.getCount()); + } + + return read(key, ByteArrayCodec.INSTANCE, ZREVRANGEBYSCORE, args.toArray()); + } + + @Override + public Set zRevRangeByScoreWithScores(byte[] key, double min, double max, long offset, long count) { + return zRevRangeByScoreWithScores(key, new Range().gte(min).lte(max), + new Limit().offset(Long.valueOf(offset).intValue()).count(Long.valueOf(count).intValue())); + } + + @Override + public Set zRevRangeByScoreWithScores(byte[] key, Range range) { + return zRevRangeByScoreWithScores(key, range, null); + } + + @Override + public Set zRevRangeByScoreWithScores(byte[] key, Range range, Limit limit) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + + List args = new ArrayList(); + args.add(key); + args.add(min); + args.add(max); + args.add("WITHSCORES"); + + if (limit != null) { + args.add("LIMIT"); + args.add(limit.getOffset()); + args.add(limit.getCount()); + } + + return read(key, ByteArrayCodec.INSTANCE, ZREVRANGEBYSCORE, args.toArray()); + } + + @Override + public Long zCount(byte[] key, double min, double max) { + return zCount(key, new Range().gte(min).lte(max)); + } + + @Override + public Long zCount(byte[] key, Range range) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + return read(key, StringCodec.INSTANCE, RedisCommands.ZCOUNT, key, min, max); + } + + @Override + public Long zCard(byte[] key) { + return read(key, StringCodec.INSTANCE, RedisCommands.ZCARD, key); + } + + @Override + public Double zScore(byte[] key, byte[] value) { + return read(key, StringCodec.INSTANCE, RedisCommands.ZSCORE, key, value); + } + + private static final RedisStrictCommand ZREMRANGEBYRANK = new RedisStrictCommand("ZREMRANGEBYRANK"); + private static final RedisStrictCommand ZREMRANGEBYSCORE = new RedisStrictCommand("ZREMRANGEBYSCORE"); + + @Override + public Long zRemRange(byte[] key, long start, long end) { + return write(key, StringCodec.INSTANCE, ZREMRANGEBYRANK, key, start, end); + } + + @Override + public Long zRemRangeByScore(byte[] key, double min, double max) { + return zRemRangeByScore(key, new Range().gte(min).lte(max)); + } + + @Override + public Long zRemRangeByScore(byte[] key, Range range) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + return write(key, StringCodec.INSTANCE, ZREMRANGEBYSCORE, key, min, max); + } + + @Override + public Long zUnionStore(byte[] destKey, byte[]... sets) { + return zUnionStore(destKey, null, null, sets); + } + + private static final RedisStrictCommand ZUNIONSTORE = new RedisStrictCommand("ZUNIONSTORE"); + + @Override + public Long zUnionStore(byte[] destKey, Aggregate aggregate, int[] weights, byte[]... sets) { + List args = new ArrayList(sets.length*2 + 5); + args.add(destKey); + args.add(sets.length); + args.addAll(Arrays.asList(sets)); + if (weights != null) { + args.add("WEIGHTS"); + args.addAll(Arrays.asList(weights)); + } + if (aggregate != null) { + args.add("AGGREGATE"); + args.add(aggregate.name()); + } + return write(destKey, LongCodec.INSTANCE, ZUNIONSTORE, args.toArray()); + } + + private static final RedisStrictCommand ZINTERSTORE = new RedisStrictCommand("ZINTERSTORE"); + + @Override + public Long zInterStore(byte[] destKey, byte[]... sets) { + return zInterStore(destKey, null, null, sets); + } + + @Override + public Long zInterStore(byte[] destKey, Aggregate aggregate, int[] weights, byte[]... sets) { + List args = new ArrayList(sets.length*2 + 5); + args.add(destKey); + args.add(sets.length); + args.addAll(Arrays.asList(sets)); + if (weights != null) { + args.add("WEIGHTS"); + args.addAll(Arrays.asList(weights)); + } + if (aggregate != null) { + args.add("AGGREGATE"); + args.add(aggregate.name()); + } + return write(destKey, StringCodec.INSTANCE, ZINTERSTORE, args.toArray()); + } + + private static final RedisCommand> ZSCAN = new RedisCommand>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetReplayDecoder(), new ScoredSortedSetScanReplayDecoder())); + + @Override + public Cursor zScan(byte[] key, ScanOptions options) { + return new KeyBoundCursor(key, 0, options) { + + private RedisClient client; + + @Override + protected ScanIteration doScan(byte[] key, long cursorId, ScanOptions options) { + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode."); + } + + List args = new ArrayList(); + args.add(key); + args.add(cursorId); + if (options.getPattern() != null) { + args.add("MATCH"); + args.add(options.getPattern()); + } + if (options.getCount() != null) { + args.add("COUNT"); + args.add(options.getCount()); + } + + RFuture> f = executorService.readAsync(client, key, ByteArrayCodec.INSTANCE, ZSCAN, args.toArray()); + ListScanResult res = get(f); + client = res.getRedisClient(); + return new ScanIteration(res.getPos(), res.getValues()); + } + }.open(); + } + + @Override + public Set zRangeByScore(byte[] key, String min, String max) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.ZRANGEBYSCORE, key, min, max); + } + + @Override + public Set zRangeByScore(byte[] key, Range range) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.ZRANGEBYSCORE, key, min, max); + } + + @Override + public Set zRangeByScore(byte[] key, String min, String max, long offset, long count) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.ZRANGEBYSCORE, key, min, max, "LIMIT", offset, count); + } + + @Override + public Set zRangeByScore(byte[] key, Range range, Limit limit) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-inf"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+inf"); + + List args = new ArrayList(); + args.add(key); + args.add(min); + args.add(max); + + if (limit != null) { + args.add("LIMIT"); + args.add(limit.getOffset()); + args.add(limit.getCount()); + } + + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.ZRANGEBYSCORE, args.toArray()); + } + + @Override + public Set zRangeByLex(byte[] key) { + return zRangeByLex(key, Range.unbounded()); + } + + private static final RedisCommand> ZRANGEBYLEX = new RedisCommand>("ZRANGEBYLEX", new ObjectSetReplayDecoder()); + + @Override + public Set zRangeByLex(byte[] key, Range range) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+"); + return read(key, ByteArrayCodec.INSTANCE, ZRANGEBYLEX, key, min, max); + } + + @Override + public Set zRangeByLex(byte[] key, Range range, Limit limit) { + String min = value(range.getMin().getValue(), range.getMin().isIncluding(), "-"); + String max = value(range.getMax().getValue(), range.getMax().isIncluding(), "+"); + + List args = new ArrayList(); + args.add(key); + args.add(min); + args.add(max); + + if (limit != null) { + args.add("LIMIT"); + args.add(limit.getOffset()); + args.add(limit.getCount()); + } + + return read(key, ByteArrayCodec.INSTANCE, ZRANGEBYLEX, args.toArray()); + } + + @Override + public Boolean hSet(byte[] key, byte[] field, byte[] value) { + return write(key, StringCodec.INSTANCE, RedisCommands.HSET, key, field, value); + } + + @Override + public Boolean hSetNX(byte[] key, byte[] field, byte[] value) { + return write(key, StringCodec.INSTANCE, RedisCommands.HSETNX, key, field, value); + } + + @Override + public byte[] hGet(byte[] key, byte[] field) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.HGET, key, field); + } + + private static final RedisCommand> HMGET = new RedisCommand>("HMGET", new ObjectListReplayDecoder()); + + @Override + public List hMGet(byte[] key, byte[]... fields) { + List args = new ArrayList(fields.length + 1); + args.add(key); + args.addAll(Arrays.asList(fields)); + return read(key, ByteArrayCodec.INSTANCE, HMGET, args.toArray()); + } + + @Override + public void hMSet(byte[] key, Map hashes) { + List params = new ArrayList(hashes.size()*2 + 1); + params.add(key); + for (Map.Entry entry : hashes.entrySet()) { + params.add(entry.getKey()); + params.add(entry.getValue()); + } + + write(key, StringCodec.INSTANCE, RedisCommands.HMSET, params.toArray()); + } + + private static final RedisCommand HINCRBY = new RedisCommand("HINCRBY"); + + @Override + public Long hIncrBy(byte[] key, byte[] field, long delta) { + return write(key, StringCodec.INSTANCE, HINCRBY, key, field, delta); + } + + @Override + public Double hIncrBy(byte[] key, byte[] field, double delta) { + return write(key, StringCodec.INSTANCE, HINCRBY, key, field, BigDecimal.valueOf(delta).toPlainString()); + } + + @Override + public Boolean hExists(byte[] key, byte[] field) { + return read(key, StringCodec.INSTANCE, RedisCommands.HEXISTS, key, field); + } + + @Override + public Long hDel(byte[] key, byte[]... fields) { + List args = new ArrayList(fields.length + 1); + args.add(key); + args.addAll(Arrays.asList(fields)); + return write(key, StringCodec.INSTANCE, RedisCommands.HDEL, args.toArray()); + } + + private static final RedisStrictCommand HLEN = new RedisStrictCommand("HLEN"); + + @Override + public Long hLen(byte[] key) { + return read(key, StringCodec.INSTANCE, HLEN, key); + } + + @Override + public Set hKeys(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.HKEYS, key); + } + + @Override + public List hVals(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.HVALS, key); + } + + @Override + public Map hGetAll(byte[] key) { + return read(key, ByteArrayCodec.INSTANCE, RedisCommands.HGETALL, key); + } + + @Override + public Cursor> hScan(byte[] key, ScanOptions options) { + return new KeyBoundCursor>(key, 0, options) { + + private RedisClient client; + + @Override + protected ScanIteration> doScan(byte[] key, long cursorId, ScanOptions options) { + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'HSCAN' cannot be called in pipeline / transaction mode."); + } + + List args = new ArrayList(); + args.add(key); + args.add(cursorId); + if (options.getPattern() != null) { + args.add("MATCH"); + args.add(options.getPattern()); + } + if (options.getCount() != null) { + args.add("COUNT"); + args.add(options.getCount()); + } + + RFuture> f = executorService.readAsync(client, key, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN, args.toArray()); + MapScanResult res = get(f); + client = res.getRedisClient(); + return new ScanIteration>(res.getPos(), res.getValues()); + } + }.open(); + } + + @Override + public void multi() { + BatchOptions options = BatchOptions.defaults() + .executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); + this.executorService = new CommandBatchService(redisson.getConnectionManager(), options); + } + + @Override + public List exec() { + if (isQueueing()) { + BatchResult result = ((CommandBatchService)executorService).execute(); + return (List) result.getResponses(); + } else { + throw new InvalidDataAccessApiUsageException("Not in transaction mode. Please invoke multi method"); + } + } + + @Override + public void discard() { + if (isQueueing()) { + get(executorService.writeAsync(null, RedisCommands.DISCARD)); + } else { + throw new InvalidDataAccessApiUsageException("Not in transaction mode. Please invoke multi method"); + } + } + + @Override + public void watch(byte[]... keys) { + get(executorService.writeAsync(null, RedisCommands.WATCH, keys)); + } + + @Override + public void unwatch() { + get(executorService.writeAsync(null, RedisCommands.UNWATCH)); + } + + @Override + public boolean isSubscribed() { + // TODO Auto-generated method stub + return false; + } + + @Override + public Subscription getSubscription() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Long publish(byte[] channel, byte[] message) { + return write(channel, StringCodec.INSTANCE, RedisCommands.PUBLISH, channel, message); + } + + @Override + public void subscribe(MessageListener listener, byte[]... channels) { + // TODO Auto-generated method stub + + } + + @Override + public void pSubscribe(MessageListener listener, byte[]... patterns) { + // TODO Auto-generated method stub + + } + + @Override + public void select(int dbIndex) { + // TODO Auto-generated method stub + + } + + @Override + public byte[] echo(byte[] message) { + // TODO Auto-generated method stub + return null; + } + + @Override + public String ping() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void bgWriteAof() { + // TODO Auto-generated method stub + + } + + @Override + public void bgReWriteAof() { + // TODO Auto-generated method stub + + } + + @Override + public void bgSave() { + // TODO Auto-generated method stub + + } + + @Override + public Long lastSave() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void save() { + // TODO Auto-generated method stub + + } + + @Override + public Long dbSize() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void flushDb() { + get(executorService.writeAllAsync(RedisCommands.FLUSHDB)); + } + + @Override + public void flushAll() { + get(executorService.writeAllAsync(RedisCommands.FLUSHALL)); + } + + @Override + public Properties info() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Properties info(String section) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void shutdown() { + // TODO Auto-generated method stub + + } + + @Override + public void shutdown(ShutdownOption option) { + // TODO Auto-generated method stub + + } + + @Override + public List getConfig(String pattern) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setConfig(String param, String value) { + // TODO Auto-generated method stub + + } + + @Override + public void resetConfigStats() { + // TODO Auto-generated method stub + + } + + @Override + public Long time() { + Time t = read(null, StringCodec.INSTANCE, RedisCommands.TIME); + return t.getSeconds() * 1000L + t.getMicroseconds() / 1000L; + } + + private static final RedisStrictCommand CLIENT_KILL = new RedisStrictCommand("CLIENT", "KILL", new VoidReplayConvertor()); + + @Override + public void killClient(String host, int port) { + get(executorService.writeAllAsync(CLIENT_KILL, host + ":" + port)); + } + + @Override + public void setClientName(byte[] name) { + // TODO Auto-generated method stub + + } + + @Override + public String getClientName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getClientList() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void slaveOf(String host, int port) { + // TODO Auto-generated method stub + + } + + @Override + public void slaveOfNoOne() { + // TODO Auto-generated method stub + + } + + @Override + public void migrate(byte[] key, RedisNode target, int dbIndex, MigrateOption option) { + migrate(key, target, dbIndex, option, Long.MAX_VALUE); + } + + @Override + public void migrate(byte[] key, RedisNode target, int dbIndex, MigrateOption option, long timeout) { + write(key, StringCodec.INSTANCE, RedisCommands.MIGRATE, target.getHost(), target.getPort(), key, dbIndex, timeout); + } + + @Override + public void scriptFlush() { + get(executorService.writeAllAsync(RedisCommands.SCRIPT_FLUSH)); + } + + @Override + public void scriptKill() { + // TODO Auto-generated method stub + + } + + @Override + public String scriptLoad(byte[] script) { + return get(executorService.writeAllAsync(StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, new SlotCallback() { + volatile String result; + @Override + public void onSlotResult(String result) { + this.result = result; + } + + @Override + public String onFinish() { + return result; + } + }, script)); + } + + @Override + public List scriptExists(final String... scriptShas) { + return get(executorService.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback, List>() { + + List result = new ArrayList(scriptShas.length); + + @Override + public synchronized void onSlotResult(List result) { + for (int i = 0; i < result.size(); i++) { + if (this.result.size() == i) { + this.result.add(false); + } + this.result.set(i, this.result.get(i) | result.get(i)); + } + } + + @Override + public List onFinish() { + return result; + } + }, (Object[])scriptShas)); + } + + @Override + public T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) { + RedisCommand c = toCommand(returnType); + List params = new ArrayList(); + params.add(script); + params.add(numKeys); + params.addAll(Arrays.asList(keysAndArgs)); + return write(null, ByteArrayCodec.INSTANCE, new RedisCommand(c, "EVAL"), params.toArray()); + } + + protected RedisCommand toCommand(ReturnType returnType) { + RedisCommand c = null; + if (returnType == ReturnType.BOOLEAN) { + c = org.redisson.api.RScript.ReturnType.BOOLEAN.getCommand(); + } else if (returnType == ReturnType.INTEGER) { + c = org.redisson.api.RScript.ReturnType.INTEGER.getCommand(); + } else if (returnType == ReturnType.MULTI) { + c = org.redisson.api.RScript.ReturnType.MULTI.getCommand(); + } else if (returnType == ReturnType.STATUS) { + c = org.redisson.api.RScript.ReturnType.STATUS.getCommand(); + } else if (returnType == ReturnType.VALUE) { + c = org.redisson.api.RScript.ReturnType.VALUE.getCommand(); + } + return c; + } + + @Override + public T evalSha(String scriptSha, ReturnType returnType, int numKeys, byte[]... keysAndArgs) { + RedisCommand c = toCommand(returnType); + List params = new ArrayList(); + params.add(scriptSha); + params.add(numKeys); + params.addAll(Arrays.asList(keysAndArgs)); + return write(null, ByteArrayCodec.INSTANCE, new RedisCommand(c, "EVALSHA"), params.toArray()); + } + + @Override + public T evalSha(byte[] scriptSha, ReturnType returnType, int numKeys, byte[]... keysAndArgs) { + RedisCommand c = toCommand(returnType); + List params = new ArrayList(); + params.add(scriptSha); + params.add(numKeys); + params.addAll(Arrays.asList(keysAndArgs)); + return write(null, ByteArrayCodec.INSTANCE, new RedisCommand(c, "EVALSHA"), params.toArray()); + } + + @Override + public Long geoAdd(byte[] key, Point point, byte[] member) { + return write(key, StringCodec.INSTANCE, RedisCommands.GEOADD, key, point.getX(), point.getY(), member); + } + + @Override + public Long geoAdd(byte[] key, GeoLocation location) { + return write(key, StringCodec.INSTANCE, RedisCommands.GEOADD, key, location.getPoint().getX(), location.getPoint().getY(), location.getName()); + } + + @Override + public Long geoAdd(byte[] key, Map memberCoordinateMap) { + List params = new ArrayList(memberCoordinateMap.size()*3 + 1); + params.add(key); + for (Entry entry : memberCoordinateMap.entrySet()) { + params.add(entry.getValue().getX()); + params.add(entry.getValue().getY()); + params.add(entry.getKey()); + } + return write(key, StringCodec.INSTANCE, RedisCommands.GEOADD, params.toArray()); + } + + @Override + public Long geoAdd(byte[] key, Iterable> locations) { + List params = new ArrayList(); + params.add(key); + for (GeoLocation location : locations) { + params.add(location.getPoint().getX()); + params.add(location.getPoint().getY()); + params.add(location.getName()); + } + return write(key, StringCodec.INSTANCE, RedisCommands.GEOADD, params.toArray()); + } + + @Override + public Distance geoDist(byte[] key, byte[] member1, byte[] member2) { + return geoDist(key, member1, member2, DistanceUnit.METERS); + } + + @Override + public Distance geoDist(byte[] key, byte[] member1, byte[] member2, Metric metric) { + return read(key, DoubleCodec.INSTANCE, new RedisCommand("GEODIST", new DistanceConvertor(metric)), key, member1, member2, metric); + } + + private static final RedisCommand> GEOHASH = new RedisCommand>("GEOHASH", new ObjectListReplayDecoder()); + + @Override + public List geoHash(byte[] key, byte[]... members) { + List params = new ArrayList(members.length + 1); + params.add(key); + for (byte[] member : members) { + params.add(member); + } + return read(key, StringCodec.INSTANCE, GEOHASH, params.toArray()); + } + + @Override + public List geoPos(byte[] key, byte[]... members) { + List params = new ArrayList(members.length + 1); + params.add(key); + for (byte[] member : members) { + params.add(member); + } + + MultiDecoder> decoder = new ListMultiDecoder(new PointDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ObjectListReplayDecoder()); + RedisCommand> command = new RedisCommand>("GEOPOS", decoder); + return read(key, StringCodec.INSTANCE, command, params.toArray()); + } + + private String convert(double longitude) { + return BigDecimal.valueOf(longitude).toPlainString(); + } + + private final MultiDecoder>> postitionDecoder = new ListMultiDecoder(new CodecDecoder(), new PointDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoResultsDecoder()); + + @Override + public GeoResults> geoRadius(byte[] key, Circle within) { + RedisCommand>> command = new RedisCommand>>("GEORADIUS", postitionDecoder); + return read(key, ByteArrayCodec.INSTANCE, command, key, + convert(within.getCenter().getX()), convert(within.getCenter().getY()), + within.getRadius().getValue(), within.getRadius().getMetric().getAbbreviation()); + } + + @Override + public GeoResults> geoRadius(byte[] key, Circle within, GeoRadiusCommandArgs args) { + List params = new ArrayList(); + params.add(key); + params.add(convert(within.getCenter().getX())); + params.add(convert(within.getCenter().getY())); + params.add(within.getRadius().getValue()); + params.add(within.getRadius().getMetric().getAbbreviation()); + + RedisCommand>> command; + if (args.getFlags().contains(GeoRadiusCommandArgs.Flag.WITHCOORD)) { + command = new RedisCommand>>("GEORADIUS", postitionDecoder); + params.add("WITHCOORD"); + } else { + MultiDecoder>> distanceDecoder = new ListMultiDecoder(new GeoDistanceDecoder(), new GeoResultsDecoder(within.getRadius().getMetric())); + command = new RedisCommand>>("GEORADIUS", distanceDecoder); + params.add("WITHDIST"); + } + + if (args.getLimit() != null) { + params.add("COUNT"); + params.add(args.getLimit()); + } + if (args.getSortDirection() != null) { + params.add(args.getSortDirection().name()); + } + + return read(key, ByteArrayCodec.INSTANCE, command, params.toArray()); + } + + @Override + public GeoResults> geoRadiusByMember(byte[] key, byte[] member, double radius) { + return geoRadiusByMember(key, member, new Distance(radius, DistanceUnit.METERS)); + } + + private static final RedisCommand>> GEORADIUSBYMEMBER = new RedisCommand>>("GEORADIUSBYMEMBER", new GeoResultsDecoder()); + + @Override + public GeoResults> geoRadiusByMember(byte[] key, byte[] member, Distance radius) { + return read(key, ByteArrayCodec.INSTANCE, GEORADIUSBYMEMBER, key, member, radius.getValue(), radius.getMetric().getAbbreviation()); + } + + @Override + public GeoResults> geoRadiusByMember(byte[] key, byte[] member, Distance radius, + GeoRadiusCommandArgs args) { + List params = new ArrayList(); + params.add(key); + params.add(member); + params.add(radius.getValue()); + params.add(radius.getMetric().getAbbreviation()); + + RedisCommand>> command; + if (args.getFlags().contains(GeoRadiusCommandArgs.Flag.WITHCOORD)) { + command = new RedisCommand>>("GEORADIUSBYMEMBER", postitionDecoder); + params.add("WITHCOORD"); + } else { + MultiDecoder>> distanceDecoder = new ListMultiDecoder(new GeoDistanceDecoder(), new GeoResultsDecoder(radius.getMetric())); + command = new RedisCommand>>("GEORADIUSBYMEMBER", distanceDecoder); + params.add("WITHDIST"); + } + + if (args.getLimit() != null) { + params.add("COUNT"); + params.add(args.getLimit()); + } + if (args.getSortDirection() != null) { + params.add(args.getSortDirection().name()); + } + + return read(key, ByteArrayCodec.INSTANCE, command, params.toArray()); + } + + @Override + public Long geoRemove(byte[] key, byte[]... members) { + return zRem(key, members); + } + + private static final RedisCommand PFADD = new RedisCommand("PFADD"); + + @Override + public Long pfAdd(byte[] key, byte[]... values) { + List params = new ArrayList(values.length + 1); + params.add(key); + for (byte[] member : values) { + params.add(member); + } + + return write(key, StringCodec.INSTANCE, PFADD, params.toArray()); + } + + @Override + public Long pfCount(byte[]... keys) { + return write(keys[0], StringCodec.INSTANCE, RedisCommands.PFCOUNT, keys); + } + + @Override + public void pfMerge(byte[] destinationKey, byte[]... sourceKeys) { + List args = new ArrayList(sourceKeys.length + 1); + args.add(destinationKey); + args.addAll(Arrays.asList(sourceKeys)); + write(destinationKey, StringCodec.INSTANCE, RedisCommands.PFMERGE, args.toArray()); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java new file mode 100644 index 000000000..87f082944 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonConnectionFactory.java @@ -0,0 +1,93 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.ExceptionTranslationStrategy; +import org.springframework.data.redis.PassThroughExceptionTranslationStrategy; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisSentinelConnection; + +/** + * Redisson based connection factory + * + * @author Nikita Koksharov + * + */ +public class RedissonConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean { + + private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = + new PassThroughExceptionTranslationStrategy(new RedissonExceptionConverter()); + + private Config config; + private RedissonClient redisson; + + public RedissonConnectionFactory(RedissonClient redisson) { + this.redisson = redisson; + } + + public RedissonConnectionFactory(Config config) { + super(); + this.config = config; + } + + @Override + public DataAccessException translateExceptionIfPossible(RuntimeException ex) { + return EXCEPTION_TRANSLATION.translate(ex); + } + + @Override + public void destroy() throws Exception { + redisson.shutdown(); + } + + @Override + public void afterPropertiesSet() throws Exception { + if (config != null) { + redisson = Redisson.create(config); + } + } + + @Override + public RedisConnection getConnection() { + return new RedissonConnection(redisson); + } + + @Override + public RedisClusterConnection getClusterConnection() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getConvertPipelineAndTxResults() { + return true; + } + + @Override + public RedisSentinelConnection getSentinelConnection() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/RedissonExceptionConverter.java b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonExceptionConverter.java new file mode 100644 index 000000000..96a70d409 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/RedissonExceptionConverter.java @@ -0,0 +1,61 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisException; +import org.redisson.client.RedisRedirectException; +import org.redisson.client.RedisTimeoutException; +import org.springframework.core.convert.converter.Converter; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.QueryTimeoutException; +import org.springframework.data.redis.ClusterRedirectException; +import org.springframework.data.redis.RedisConnectionFailureException; +import org.springframework.data.redis.RedisSystemException; + +/** + * Converts Redisson exceptions to Spring compatible + * + * @author Nikita Koksharov + * + */ +public class RedissonExceptionConverter implements Converter { + + @Override + public DataAccessException convert(Exception source) { + if (source instanceof RedisConnectionException) { + return new RedisConnectionFailureException(source.getMessage(), source); + } + if (source instanceof RedisRedirectException) { + RedisRedirectException ex = (RedisRedirectException) source; + return new ClusterRedirectException(ex.getSlot(), ex.getUrl().getHost(), ex.getUrl().getPort(), source); + } + if (source instanceof RedisException) { + return new RedisSystemException(source.getMessage(), source); + } + + if (source instanceof DataAccessException) { + return (DataAccessException) source; + } + + if (source instanceof RedisTimeoutException) { + return new QueryTimeoutException(source.getMessage(), source); + } + + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/ScoredSortedSetReplayDecoder.java b/redisson/src/main/java/org/redisson/spring/data/connection/ScoredSortedSetReplayDecoder.java new file mode 100644 index 000000000..d7ab41150 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/ScoredSortedSetReplayDecoder.java @@ -0,0 +1,53 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.springframework.data.redis.connection.DefaultTuple; +import org.springframework.data.redis.connection.RedisZSetCommands.Tuple; + +/** + * + * @author Nikita Koksharov + * + */ +public class ScoredSortedSetReplayDecoder implements MultiDecoder> { + + @Override + public Decoder getDecoder(int paramNum, State state) { + if (paramNum % 2 != 0) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return null; + } + + @Override + public Set decode(List parts, State state) { + Set result = new LinkedHashSet(); + for (int i = 0; i < parts.size(); i += 2) { + result.add(new DefaultTuple((byte[])parts.get(i), ((Number)parts.get(i+1)).doubleValue())); + } + return result; + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java b/redisson/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java new file mode 100644 index 000000000..4a0cbba8a --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java @@ -0,0 +1,41 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.concurrent.TimeUnit; + +import org.redisson.client.protocol.convertor.SingleConvertor; + +/** + * + * @author Nikita Koksharov + * + */ +public class SecondsConvertor extends SingleConvertor { + + private final TimeUnit unit; + + public SecondsConvertor(TimeUnit unit) { + super(); + this.unit = unit; + } + + @Override + public Long convert(Object obj) { + return unit.convert((Long)obj, TimeUnit.SECONDS); + } + +} diff --git a/redisson/src/main/java/org/redisson/spring/data/connection/SetReplayDecoder.java b/redisson/src/main/java/org/redisson/spring/data/connection/SetReplayDecoder.java new file mode 100644 index 000000000..cb40d0349 --- /dev/null +++ b/redisson/src/main/java/org/redisson/spring/data/connection/SetReplayDecoder.java @@ -0,0 +1,50 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.spring.data.connection; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.decoder.MultiDecoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class SetReplayDecoder implements MultiDecoder> { + + private final Decoder decoder; + + public SetReplayDecoder(Decoder decoder) { + super(); + this.decoder = decoder; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + return decoder; + } + + @Override + public Set decode(List parts, State state) { + return new LinkedHashSet(parts); + } + +}