From d854ce0f3a0d13e16e84a6cdc0d3c0215b40c251 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 31 Mar 2016 20:34:31 +0300 Subject: [PATCH] RGeo implemented. #295 --- src/main/java/org/redisson/Redisson.java | 11 + .../java/org/redisson/RedissonClient.java | 19 ++ src/main/java/org/redisson/RedissonGeo.java | 194 ++++++++++++++++++ .../redisson/client/codec/GeoEntryCodec.java | 45 ++++ .../redisson/client/codec/ScoredCodec.java | 2 +- .../client/handler/CommandEncoder.java | 8 +- .../client/protocol/RedisCommand.java | 2 +- .../client/protocol/RedisCommands.java | 13 +- .../convertor/DoubleReplayConvertor.java | 2 +- .../decoder/FlatNestedMultiDecoder.java | 44 ++++ .../protocol/decoder/GeoDistanceDecoder.java | 58 ++++++ .../decoder/GeoDistanceMapDecoder.java | 68 ++++++ .../protocol/decoder/GeoMapReplayDecoder.java | 48 +++++ .../protocol/decoder/GeoPositionDecoder.java | 50 +++++ .../decoder/GeoPositionMapDecoder.java | 62 ++++++ .../protocol/decoder/NestedMultiDecoder.java | 108 +++++++--- .../protocol/decoder/NestedMultiDecoder2.java | 86 -------- .../connection/decoder/MapGetAllDecoder.java | 4 + src/main/java/org/redisson/core/GeoEntry.java | 43 ++++ .../java/org/redisson/core/GeoPosition.java | 70 +++++++ src/main/java/org/redisson/core/GeoUnit.java | 48 +++++ src/main/java/org/redisson/core/RGeo.java | 51 +++++ .../java/org/redisson/core/RGeoAsync.java | 53 +++++ 23 files changed, 972 insertions(+), 117 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonGeo.java create mode 100644 src/main/java/org/redisson/client/codec/GeoEntryCodec.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/GeoDistanceDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/GeoDistanceMapDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/GeoPositionDecoder.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/GeoPositionMapDecoder.java delete mode 100644 src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java create mode 100644 src/main/java/org/redisson/core/GeoEntry.java create mode 100644 src/main/java/org/redisson/core/GeoPosition.java create mode 100644 src/main/java/org/redisson/core/GeoUnit.java create mode 100644 src/main/java/org/redisson/core/RGeo.java create mode 100644 src/main/java/org/redisson/core/RGeoAsync.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index f1fdf2953..a6c88ea08 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -49,6 +49,7 @@ import org.redisson.core.RBloomFilter; import org.redisson.core.RBucket; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; +import org.redisson.core.RGeo; import org.redisson.core.RHyperLogLog; import org.redisson.core.RKeys; import org.redisson.core.RLexSortedSet; @@ -180,6 +181,16 @@ public class Redisson implements RedissonClient { public static RedissonReactiveClient createReactive(Config config) { return new RedissonReactive(config); } + + @Override + public RGeo getGeo(String name) { + return new RedissonGeo(commandExecutor, name); + } + + @Override + public RGeo getGeo(String name, Codec codec) { + return new RedissonGeo(codec, commandExecutor, name); + } @Override public RBucket getBucket(String name) { diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index edb10dced..a83750c7d 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -33,6 +33,7 @@ import org.redisson.core.RBloomFilter; import org.redisson.core.RBucket; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; +import org.redisson.core.RGeo; import org.redisson.core.RHyperLogLog; import org.redisson.core.RKeys; import org.redisson.core.RLexSortedSet; @@ -65,6 +66,24 @@ import org.redisson.core.RTopic; */ public interface RedissonClient { + /** + * Returns geospatial items holder instance by name. + * + * @param name + * @return + */ + RGeo getGeo(String name); + + /** + * Returns geospatial items holder instance by name + * using provided codec for geospatial members. + * + * @param name + * @param geospatial member codec + * @return + */ + RGeo getGeo(String name, Codec codec); + /** * Returns set-based cache instance by name. * Supports value eviction with a given TTL value. diff --git a/src/main/java/org/redisson/RedissonGeo.java b/src/main/java/org/redisson/RedissonGeo.java new file mode 100644 index 000000000..dd36d3764 --- /dev/null +++ b/src/main/java/org/redisson/RedissonGeo.java @@ -0,0 +1,194 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.GeoEntryCodec; +import org.redisson.client.codec.ScoredCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.decoder.GeoDistanceDecoder; +import org.redisson.client.protocol.decoder.GeoMapReplayDecoder; +import org.redisson.client.protocol.decoder.GeoPositionDecoder; +import org.redisson.client.protocol.decoder.GeoPositionMapDecoder; +import org.redisson.client.protocol.decoder.MultiDecoder; +import org.redisson.client.protocol.decoder.NestedMultiDecoder; +import org.redisson.client.protocol.decoder.FlatNestedMultiDecoder; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.decoder.MapGetAllDecoder; +import org.redisson.core.GeoEntry; +import org.redisson.core.GeoPosition; +import org.redisson.core.GeoUnit; +import org.redisson.core.RGeo; + +import io.netty.util.concurrent.Future; + +public class RedissonGeo extends RedissonExpirable implements RGeo { + + public RedissonGeo(CommandAsyncExecutor connectionManager, String name) { + super(connectionManager, name); + } + + public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(codec, connectionManager, name); + } + + @Override + public Future addAsync(double longitude, double latitude, V member) { + return commandExecutor.writeAsync(getName(), RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member); + } + + private String convert(double longitude) { + return BigDecimal.valueOf(longitude).toPlainString(); + } + + @Override + public long add(double longitude, double latitude, V member) { + return get(addAsync(longitude, latitude, member)); + } + + @Override + public long add(GeoEntry... entries) { + return get(addAsync(entries)); + } + + @Override + public Future addAsync(GeoEntry... entries) { + List params = new ArrayList(entries.length + 1); + params.add(getName()); + for (GeoEntry entry : entries) { + params.add(entry.getLongitude()); + params.add(entry.getLatitude()); + params.add(entry.getMember()); + } + return commandExecutor.writeAsync(getName(), new GeoEntryCodec(codec), RedisCommands.GEOADD_ENTRIES, params.toArray()); + } + + @Override + public Double dist(V firstMember, V secondMember, GeoUnit geoUnit) { + return get(distAsync(firstMember, secondMember, geoUnit)); + } + + @Override + public Future distAsync(V firstMember, V secondMember, GeoUnit geoUnit) { + return commandExecutor.readAsync(getName(), new ScoredCodec(codec), RedisCommands.GEODIST, getName(), firstMember, secondMember, geoUnit); + } + + @Override + public Map hash(V... members) { + return get(hashAsync(members)); + } + + @Override + public Future> hashAsync(V... members) { + List params = new ArrayList(members.length + 1); + params.add(getName()); + params.addAll(Arrays.asList(members)); + RedisCommand> command = new RedisCommand>("GEOHASH", new MapGetAllDecoder(params), 2, ValueType.OBJECTS); + return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray()); + } + + @Override + public Map pos(V... members) { + return get(posAsync(members)); + } + + @Override + public Future> posAsync(V... members) { + List params = new ArrayList(members.length + 1); + params.add(getName()); + params.addAll(Arrays.asList(members)); + + MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params)); + RedisCommand> command = new RedisCommand>("GEOPOS", decoder, 2, ValueType.OBJECTS); + return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray()); + } + + @Override + public List radius(double longitude, double latitude, double radius, GeoUnit geoUnit) { + return get(radiusAsync(longitude, latitude, radius, geoUnit)); + } + + @Override + public Future> radiusAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.GEORADIUS, getName(), convert(longitude), convert(latitude), radius, geoUnit); + } + + @Override + public Map radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit) { + return get(radiusWithDistanceAsync(longitude, latitude, radius, geoUnit)); + } + + @Override + public Future> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) { + MultiDecoder> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); + RedisCommand> command = new RedisCommand>("GEORADIUS", decoder); + return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHDIST"); + } + + @Override + public Map radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit) { + return get(radiusWithPositionAsync(longitude, latitude, radius, geoUnit)); + } + + @Override + public Future> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) { + MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); + RedisCommand> command = new RedisCommand>("GEORADIUS", decoder); + return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD"); + } + + @Override + public List radius(V member, double radius, GeoUnit geoUnit) { + return get(radiusAsync(member, radius, geoUnit)); + } + + @Override + public Future> radiusAsync(V member, double radius, GeoUnit geoUnit) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.GEORADIUSBYMEMBER, getName(), member, radius, geoUnit); + } + + @Override + public Map radiusWithDistance(V member, double radius, GeoUnit geoUnit) { + return get(radiusWithDistanceAsync(member, radius, geoUnit)); + } + + @Override + public Future> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit) { + MultiDecoder> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); + RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", decoder, 2); + return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHDIST"); + } + + @Override + public Map radiusWithPosition(V member, double radius, GeoUnit geoUnit) { + return get(radiusWithPositionAsync(member, radius, geoUnit)); + } + + @Override + public Future> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit) { + MultiDecoder> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder()); + RedisCommand> command = new RedisCommand>("GEORADIUSBYMEMBER", decoder, 2); + return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHCOORD"); + } +} diff --git a/src/main/java/org/redisson/client/codec/GeoEntryCodec.java b/src/main/java/org/redisson/client/codec/GeoEntryCodec.java new file mode 100644 index 000000000..71a985948 --- /dev/null +++ b/src/main/java/org/redisson/client/codec/GeoEntryCodec.java @@ -0,0 +1,45 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import org.redisson.client.protocol.Encoder; + +public class GeoEntryCodec extends StringCodec { + + private final ThreadLocal pos = new ThreadLocal() { + protected Integer initialValue() { + return 0; + }; + }; + + private final Codec delegate; + + public GeoEntryCodec(Codec delegate) { + super(); + this.delegate = delegate; + } + + @Override + public Encoder getValueEncoder() { + Integer p = pos.get() + 1; + pos.set(p); + if (p % 3 == 0) { + return delegate.getValueEncoder(); + } + return super.getValueEncoder(); + } + +} diff --git a/src/main/java/org/redisson/client/codec/ScoredCodec.java b/src/main/java/org/redisson/client/codec/ScoredCodec.java index 12378bc2f..f4bf3656e 100644 --- a/src/main/java/org/redisson/client/codec/ScoredCodec.java +++ b/src/main/java/org/redisson/client/codec/ScoredCodec.java @@ -19,7 +19,7 @@ import org.redisson.client.protocol.Encoder; public class ScoredCodec extends StringCodec { - public final Codec delegate; + private final Codec delegate; public ScoredCodec(Codec delegate) { super(); diff --git a/src/main/java/org/redisson/client/handler/CommandEncoder.java b/src/main/java/org/redisson/client/handler/CommandEncoder.java index 8c7560908..2a8b0af2c 100644 --- a/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -18,6 +18,7 @@ package org.redisson.client.handler; import java.util.List; import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.DefaultParamsEncoder; @@ -112,8 +113,11 @@ public class CommandEncoder extends MessageToByteEncoder { - public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY} + public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING} private ValueType outParamType = ValueType.OBJECT; private List inParamType = Arrays.asList(ValueType.OBJECT); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 9528685ce..d56eae403 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -15,10 +15,11 @@ */ package org.redisson.client.protocol; +import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.BitSetReplayConvertor; @@ -40,7 +41,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; -import org.redisson.client.protocol.decoder.NestedMultiDecoder2; +import org.redisson.client.protocol.decoder.FlatNestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; @@ -57,6 +58,12 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; public interface RedisCommands { + RedisStrictCommand GEOADD = new RedisStrictCommand("GEOADD", 4); + RedisStrictCommand GEOADD_ENTRIES = new RedisStrictCommand("GEOADD", 2, ValueType.OBJECTS); + RedisCommand GEODIST = new RedisCommand("GEODIST", new DoubleReplayConvertor(), 2, Arrays.asList(ValueType.OBJECT, ValueType.OBJECT, ValueType.STRING)); + RedisCommand> GEORADIUS = new RedisCommand>("GEORADIUS", new ObjectListReplayDecoder()); + RedisCommand> GEORADIUSBYMEMBER = new RedisCommand>("GEORADIUSBYMEMBER", new ObjectListReplayDecoder(), 2); + RedisStrictCommand KEYSLOT = new RedisStrictCommand("CLUSTER", "KEYSLOT", new IntegerReplayConvertor()); RedisStrictCommand GETBIT = new RedisStrictCommand("GETBIT", new BooleanReplayConvertor()); @@ -238,7 +245,7 @@ public interface RedisCommands { RedisStrictCommand> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisCommand>> SENTINEL_SLAVES = new RedisCommand>>("SENTINEL", "SLAVES", - new NestedMultiDecoder2(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT + new FlatNestedMultiDecoder(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT ); RedisStrictCommand INFO_REPLICATION = new RedisStrictCommand("INFO", "replication", new StringDataDecoder()); diff --git a/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java index 4bef92164..dd1a51eca 100644 --- a/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java +++ b/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java @@ -19,7 +19,7 @@ public class DoubleReplayConvertor extends SingleConvertor { @Override public Double convert(Object obj) { - if (obj == null) { + if (obj == null || obj.toString().isEmpty()) { return null; } return Double.valueOf(obj.toString()); diff --git a/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java new file mode 100644 index 000000000..413694b33 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java @@ -0,0 +1,44 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class FlatNestedMultiDecoder extends NestedMultiDecoder { + + public FlatNestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { + super(firstDecoder, secondDecoder); + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return firstDecoder.decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + DecoderState ds = getDecoder(state); + if (paramNum == 0) { + ds.resetDecoderIndex(); + } + return firstDecoder.isApplicable(paramNum, state); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceDecoder.java new file mode 100644 index 000000000..4464f27b5 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceDecoder.java @@ -0,0 +1,58 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class GeoDistanceDecoder implements MultiDecoder> { + + private final ThreadLocal pos = new ThreadLocal(); + + private final Codec codec; + + public GeoDistanceDecoder(Codec codec) { + super(); + this.codec = codec; + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + if (pos.get() % 2 == 0) { + return codec.getValueDecoder().decode(buf, state); + } + return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + pos.set(paramNum); + return true; + } + + @Override + public List decode(List parts, State state) { + return parts; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceMapDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceMapDecoder.java new file mode 100644 index 000000000..ac4f98393 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/GeoDistanceMapDecoder.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class GeoDistanceMapDecoder implements MultiDecoder> { + + private final ThreadLocal pos = new ThreadLocal(); + + private final Codec codec; + + public GeoDistanceMapDecoder(Codec codec) { + super(); + this.codec = codec; + } + + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + System.out.println("1 " + buf.toString(CharsetUtil.UTF_8)); + if (pos.get() % 2 == 0) { + return codec.getValueDecoder().decode(buf, state); + } + return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + pos.set(paramNum); + return true; + } + + @Override + public Map decode(List parts, State state) { + System.out.println(parts); + Map result = new HashMap(parts.size()/2); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + result.put(parts.get(i-1), parts.get(i)); + } + } + return result; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java new file mode 100644 index 000000000..94c043a47 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java @@ -0,0 +1,48 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class GeoMapReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf, State state) { + throw new UnsupportedOperationException(); + } + + @Override + public Map decode(List parts, State state) { + Map result = new HashMap(parts.size()); + for (Object object : parts) { + List vals = ((List) object); + result.put(vals.get(0), vals.get(1)); + } + return result; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/GeoPositionDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/GeoPositionDecoder.java new file mode 100644 index 000000000..3b3fea8f0 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/GeoPositionDecoder.java @@ -0,0 +1,50 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.List; + +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; +import org.redisson.core.GeoPosition; + +import io.netty.buffer.ByteBuf; + +public class GeoPositionDecoder implements MultiDecoder { + + @Override + public Double decode(ByteBuf buf, State state) throws IOException { + return (Double) DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return true; + } + + @Override + public GeoPosition decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + + Double longitude = Double.valueOf(parts.get(0).toString()); + Double latitude = Double.valueOf(parts.get(1).toString()); + return new GeoPosition(longitude, latitude); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/GeoPositionMapDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/GeoPositionMapDecoder.java new file mode 100644 index 000000000..07ac3662f --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/GeoPositionMapDecoder.java @@ -0,0 +1,62 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; + +public class GeoPositionMapDecoder implements MultiDecoder> { + + private final List args; + + public GeoPositionMapDecoder(List args) { + this.args = args; + } + + @Override + public Double decode(ByteBuf buf, State state) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + + @Override + public Map decode(List parts, State state) { + if (parts.isEmpty()) { + return Collections.emptyMap(); + } + Map result = new HashMap(parts.size()); + for (int index = 0; index < args.size()-1; index++) { + Object value = parts.get(index); + if (value == null || value == Collections.emptyMap()) { + continue; + } + result.put(args.get(index+1), value); + } + return result; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java index f8ac24ff1..c2a5f5fdd 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java @@ -16,9 +16,6 @@ package org.redisson.client.protocol.decoder; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Deque; import java.util.List; import org.redisson.client.handler.State; @@ -29,53 +26,94 @@ public class NestedMultiDecoder implements MultiDecoder { public static class DecoderState { - Deque> decoders; + int decoderIndex; + + int flipDecoderIndex; - Deque> flipDecoders; - - public DecoderState(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { - super(); - this.decoders = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder)); - this.flipDecoders = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder)); + public DecoderState() { } - public Deque> getDecoders() { - return decoders; + public int getDecoderIndex() { + return decoderIndex; } - - public Deque> getFlipDecoders() { - return flipDecoders; + public void resetDecoderIndex() { + decoderIndex = 0; + } + public void incDecoderIndex() { + decoderIndex++; + } + + public int getFlipDecoderIndex() { + return flipDecoderIndex; + } + public void resetFlipDecoderIndex() { + flipDecoderIndex = 0; + } + public void incFlipDecoderIndex() { + flipDecoderIndex++; } } - private final MultiDecoder firstDecoder; - private final MultiDecoder secondDecoder; + protected final MultiDecoder firstDecoder; + protected final MultiDecoder secondDecoder; + private MultiDecoder thirdDecoder; public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { this.firstDecoder = firstDecoder; this.secondDecoder = secondDecoder; } + + public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, MultiDecoder thirdDecoder) { + this.firstDecoder = firstDecoder; + this.secondDecoder = secondDecoder; + this.thirdDecoder = thirdDecoder; + } + @Override public Object decode(ByteBuf buf, State state) throws IOException { DecoderState ds = getDecoder(state); - return ds.getFlipDecoders().peek().decode(buf, state); + + MultiDecoder decoder = null; + if (ds.getFlipDecoderIndex() == 2) { + decoder = firstDecoder; + } + if (ds.getFlipDecoderIndex() == 1) { + decoder = secondDecoder; + } + + return decoder.decode(buf, state); } @Override public boolean isApplicable(int paramNum, State state) { DecoderState ds = getDecoder(state); if (paramNum == 0) { - ds.getFlipDecoders().poll(); + ds.incFlipDecoderIndex(); + ds.resetDecoderIndex(); + } + // used only with thirdDecoder + if (ds.getFlipDecoderIndex() == 3) { + ds.resetFlipDecoderIndex(); + ds.incFlipDecoderIndex(); + } + + MultiDecoder decoder = null; + if (ds.getFlipDecoderIndex() == 2) { + decoder = firstDecoder; + } + if (ds.getFlipDecoderIndex() == 1) { + decoder = secondDecoder; } - return ds.getFlipDecoders().peek().isApplicable(paramNum, state); + + return decoder.isApplicable(paramNum, state); } - private DecoderState getDecoder(State state) { + protected final DecoderState getDecoder(State state) { DecoderState ds = state.getDecoderState(); if (ds == null) { - ds = new DecoderState(firstDecoder, secondDecoder); + ds = new DecoderState(); state.setDecoderState(ds); } return ds; @@ -83,8 +121,32 @@ public class NestedMultiDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { + if (parts.isEmpty() && state.getDecoderState() == null) { + MultiDecoder decoder = secondDecoder; + if (thirdDecoder != null) { + decoder = thirdDecoder; + } + return decoder.decode(parts, state); + } + DecoderState ds = getDecoder(state); - return ds.getDecoders().poll().decode(parts, state); + if (parts.isEmpty()) { + ds.resetDecoderIndex(); + } + + ds.incDecoderIndex(); + MultiDecoder decoder = null; + if (ds.getDecoderIndex() == 1) { + decoder = firstDecoder; + } + if (ds.getDecoderIndex() == 2) { + decoder = secondDecoder; + } + if (ds.getDecoderIndex() == 3) { + decoder = thirdDecoder; + } + + return decoder.decode(parts, state); } } diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java deleted file mode 100644 index 36ebdac71..000000000 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder2.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.decoder; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.redisson.client.handler.State; - -import io.netty.buffer.ByteBuf; - -public class NestedMultiDecoder2 implements MultiDecoder { - - private final MultiDecoder firstDecoder; - private final MultiDecoder secondDecoder; - - public NestedMultiDecoder2(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { - this.firstDecoder = firstDecoder; - this.secondDecoder = secondDecoder; - } - - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - return firstDecoder.decode(buf, state); - } - - @Override - public boolean isApplicable(int paramNum, State state) { - if (paramNum == 0) { - setCounter(state, 0); - } - return firstDecoder.isApplicable(paramNum, state); - } - - private Integer getCounter(State state) { - Integer value = state.getDecoderState(); - if (value == null) { - return 0; - } - return value; - } - - private void setCounter(State state, Integer value) { - state.setDecoderState(value); - } - - - @Override - public Object decode(List parts, State state) { - // handle empty result - if (parts.isEmpty() && state.getDecoderState() == null) { - return secondDecoder.decode(parts, state); - } - - int counter = getCounter(state); - if (counter == 2) { - counter = 0; - } - counter++; - setCounter(state, counter); - MultiDecoder decoder = null; - if (counter == 1) { - decoder = firstDecoder; - } - if (counter == 2) { - decoder = secondDecoder; - } - return decoder.decode(parts, state); - } - -} diff --git a/src/main/java/org/redisson/connection/decoder/MapGetAllDecoder.java b/src/main/java/org/redisson/connection/decoder/MapGetAllDecoder.java index 3cf057aaa..35aa171df 100644 --- a/src/main/java/org/redisson/connection/decoder/MapGetAllDecoder.java +++ b/src/main/java/org/redisson/connection/decoder/MapGetAllDecoder.java @@ -16,6 +16,7 @@ package org.redisson.connection.decoder; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +46,9 @@ public class MapGetAllDecoder implements MultiDecoder> { @Override public Map decode(List parts, State state) { + if (parts.isEmpty()) { + return Collections.emptyMap(); + } Map result = new HashMap(parts.size()); for (int index = 0; index < args.size()-1; index++) { Object value = parts.get(index); diff --git a/src/main/java/org/redisson/core/GeoEntry.java b/src/main/java/org/redisson/core/GeoEntry.java new file mode 100644 index 000000000..7476b619f --- /dev/null +++ b/src/main/java/org/redisson/core/GeoEntry.java @@ -0,0 +1,43 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public class GeoEntry { + + private final double longitude; + private final double latitude; + private final Object member; + + public GeoEntry(double longitude, double latitude, Object member) { + super(); + this.longitude = longitude; + this.latitude = latitude; + this.member = member; + } + + public double getLatitude() { + return latitude; + } + + public double getLongitude() { + return longitude; + } + + public Object getMember() { + return member; + } + +} diff --git a/src/main/java/org/redisson/core/GeoPosition.java b/src/main/java/org/redisson/core/GeoPosition.java new file mode 100644 index 000000000..545749273 --- /dev/null +++ b/src/main/java/org/redisson/core/GeoPosition.java @@ -0,0 +1,70 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public class GeoPosition { + + private final double longitude; + private final double latitude; + + public GeoPosition(double longitude, double latitude) { + super(); + this.longitude = longitude; + this.latitude = latitude; + } + + public double getLatitude() { + return latitude; + } + + public double getLongitude() { + return longitude; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + long temp; + temp = Double.doubleToLongBits(latitude); + result = prime * result + (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(longitude); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + GeoPosition other = (GeoPosition) obj; + if (Double.doubleToLongBits(latitude) != Double.doubleToLongBits(other.latitude)) + return false; + if (Double.doubleToLongBits(longitude) != Double.doubleToLongBits(other.longitude)) + return false; + return true; + } + + @Override + public String toString() { + return "GeoPosition [longitude=" + longitude + ", latitude=" + latitude + "]"; + } + +} diff --git a/src/main/java/org/redisson/core/GeoUnit.java b/src/main/java/org/redisson/core/GeoUnit.java new file mode 100644 index 000000000..c4610ff93 --- /dev/null +++ b/src/main/java/org/redisson/core/GeoUnit.java @@ -0,0 +1,48 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public enum GeoUnit { + + METERS { + @Override + public String toString() { + return "m"; + } + }, + + KILOMETERS { + @Override + public String toString() { + return "km"; + } + }, + + MILES { + @Override + public String toString() { + return "mi"; + } + }, + + FEET { + @Override + public String toString() { + return "ft"; + } + } + +} diff --git a/src/main/java/org/redisson/core/RGeo.java b/src/main/java/org/redisson/core/RGeo.java new file mode 100644 index 000000000..c6c32dafc --- /dev/null +++ b/src/main/java/org/redisson/core/RGeo.java @@ -0,0 +1,51 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; +import java.util.Map; + +/** + * + * @author Nikita Koksharov + * + * @param + */ +public interface RGeo extends RExpirable, RGeoAsync { + + long add(double longitude, double latitude, V member); + + long add(GeoEntry... entries); + + Double dist(V firstMember, V secondMember, GeoUnit geoUnit); + + Map hash(V... members); + + Map pos(V... members); + + List radius(double longitude, double latitude, double radius, GeoUnit geoUnit); + + Map radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit); + + Map radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit); + + List radius(V member, double radius, GeoUnit geoUnit); + + Map radiusWithDistance(V member, double radius, GeoUnit geoUnit); + + Map radiusWithPosition(V member, double radius, GeoUnit geoUnit); + +} diff --git a/src/main/java/org/redisson/core/RGeoAsync.java b/src/main/java/org/redisson/core/RGeoAsync.java new file mode 100644 index 000000000..e2917d432 --- /dev/null +++ b/src/main/java/org/redisson/core/RGeoAsync.java @@ -0,0 +1,53 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; +import java.util.Map; + +import io.netty.util.concurrent.Future; + +/** + * + * @author Nikita Koksharov + * + * @param + */ +public interface RGeoAsync extends RExpirableAsync { + + Future addAsync(double longitude, double latitude, V member); + + Future addAsync(GeoEntry... entries); + + Future distAsync(V firstMember, V secondMember, GeoUnit geoUnit); + + Future> hashAsync(V... members); + + Future> posAsync(V... members); + + Future> radiusAsync(double longitude, double latitude, double radius, GeoUnit geoUnit); + + Future> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit); + + Future> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit); + + Future> radiusAsync(V member, double radius, GeoUnit geoUnit); + + Future> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit); + + Future> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit); + +}