RGeo implemented. #295

pull/461/head
Nikita 9 years ago
parent 49f10ee027
commit d854ce0f3a

@ -49,6 +49,7 @@ import org.redisson.core.RBloomFilter;
import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RGeo;
import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
@ -180,6 +181,16 @@ public class Redisson implements RedissonClient {
public static RedissonReactiveClient createReactive(Config config) {
return new RedissonReactive(config);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(commandExecutor, name);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, commandExecutor, name);
}
@Override
public <V> RBucket<V> getBucket(String name) {

@ -33,6 +33,7 @@ import org.redisson.core.RBloomFilter;
import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RGeo;
import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
@ -65,6 +66,24 @@ import org.redisson.core.RTopic;
*/
public interface RedissonClient {
/**
* Returns geospatial items holder instance by <code>name</code>.
*
* @param name
* @return
*/
<V> RGeo<V> getGeo(String name);
/**
* Returns geospatial items holder instance by <code>name</code>
* using provided codec for geospatial members.
*
* @param name
* @param geospatial member codec
* @return
*/
<V> RGeo<V> getGeo(String name, Codec codec);
/**
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.

@ -0,0 +1,194 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.GeoEntryCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.GeoDistanceDecoder;
import org.redisson.client.protocol.decoder.GeoMapReplayDecoder;
import org.redisson.client.protocol.decoder.GeoPositionDecoder;
import org.redisson.client.protocol.decoder.GeoPositionMapDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.FlatNestedMultiDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.GeoEntry;
import org.redisson.core.GeoPosition;
import org.redisson.core.GeoUnit;
import org.redisson.core.RGeo;
import io.netty.util.concurrent.Future;
public class RedissonGeo<V> extends RedissonExpirable implements RGeo<V> {
public RedissonGeo(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@Override
public Future<Long> addAsync(double longitude, double latitude, V member) {
return commandExecutor.writeAsync(getName(), RedisCommands.GEOADD, getName(), convert(longitude), convert(latitude), member);
}
private String convert(double longitude) {
return BigDecimal.valueOf(longitude).toPlainString();
}
@Override
public long add(double longitude, double latitude, V member) {
return get(addAsync(longitude, latitude, member));
}
@Override
public long add(GeoEntry... entries) {
return get(addAsync(entries));
}
@Override
public Future<Long> addAsync(GeoEntry... entries) {
List<Object> params = new ArrayList<Object>(entries.length + 1);
params.add(getName());
for (GeoEntry entry : entries) {
params.add(entry.getLongitude());
params.add(entry.getLatitude());
params.add(entry.getMember());
}
return commandExecutor.writeAsync(getName(), new GeoEntryCodec(codec), RedisCommands.GEOADD_ENTRIES, params.toArray());
}
@Override
public Double dist(V firstMember, V secondMember, GeoUnit geoUnit) {
return get(distAsync(firstMember, secondMember, geoUnit));
}
@Override
public Future<Double> distAsync(V firstMember, V secondMember, GeoUnit geoUnit) {
return commandExecutor.readAsync(getName(), new ScoredCodec(codec), RedisCommands.GEODIST, getName(), firstMember, secondMember, geoUnit);
}
@Override
public Map<V, String> hash(V... members) {
return get(hashAsync(members));
}
@Override
public Future<Map<V, String>> hashAsync(V... members) {
List<Object> params = new ArrayList<Object>(members.length + 1);
params.add(getName());
params.addAll(Arrays.asList(members));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOHASH", new MapGetAllDecoder(params), 2, ValueType.OBJECTS);
return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray());
}
@Override
public Map<V, GeoPosition> pos(V... members) {
return get(posAsync(members));
}
@Override
public Future<Map<V, GeoPosition>> posAsync(V... members) {
List<Object> params = new ArrayList<Object>(members.length + 1);
params.add(getName());
params.addAll(Arrays.asList(members));
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder(params));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder, 2, ValueType.OBJECTS);
return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray());
}
@Override
public List<V> radius(double longitude, double latitude, double radius, GeoUnit geoUnit) {
return get(radiusAsync(longitude, latitude, radius, geoUnit));
}
@Override
public Future<List<V>> radiusAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GEORADIUS, getName(), convert(longitude), convert(latitude), radius, geoUnit);
}
@Override
public Map<V, Double> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit) {
return get(radiusWithDistanceAsync(longitude, latitude, radius, geoUnit));
}
@Override
public Future<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", decoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHDIST");
}
@Override
public Map<V, GeoPosition> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit) {
return get(radiusWithPositionAsync(longitude, latitude, radius, geoUnit));
}
@Override
public Future<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS", decoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD");
}
@Override
public List<V> radius(V member, double radius, GeoUnit geoUnit) {
return get(radiusAsync(member, radius, geoUnit));
}
@Override
public Future<List<V>> radiusAsync(V member, double radius, GeoUnit geoUnit) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GEORADIUSBYMEMBER, getName(), member, radius, geoUnit);
}
@Override
public Map<V, Double> radiusWithDistance(V member, double radius, GeoUnit geoUnit) {
return get(radiusWithDistanceAsync(member, radius, geoUnit));
}
@Override
public Future<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER", decoder, 2);
return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHDIST");
}
@Override
public Map<V, GeoPosition> radiusWithPosition(V member, double radius, GeoUnit geoUnit) {
return get(radiusWithPositionAsync(member, radius, geoUnit));
}
@Override
public Future<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit) {
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER", decoder, 2);
return commandExecutor.readAsync(getName(), codec, command, getName(), member, radius, geoUnit, "WITHCOORD");
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import org.redisson.client.protocol.Encoder;
public class GeoEntryCodec extends StringCodec {
private final ThreadLocal<Integer> pos = new ThreadLocal<Integer>() {
protected Integer initialValue() {
return 0;
};
};
private final Codec delegate;
public GeoEntryCodec(Codec delegate) {
super();
this.delegate = delegate;
}
@Override
public Encoder getValueEncoder() {
Integer p = pos.get() + 1;
pos.set(p);
if (p % 3 == 0) {
return delegate.getValueEncoder();
}
return super.getValueEncoder();
}
}

@ -19,7 +19,7 @@ import org.redisson.client.protocol.Encoder;
public class ScoredCodec extends StringCodec {
public final Codec delegate;
private final Codec delegate;
public ScoredCodec(Codec delegate) {
super();

@ -18,6 +18,7 @@ package org.redisson.client.handler;
import java.util.List;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.DefaultParamsEncoder;
@ -112,8 +113,11 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
if (inParamType.get(typeIndex) == ValueType.OBJECTS) {
return msg.getCodec().getValueEncoder();
}
if (inParamType.get(typeIndex) == ValueType.BINARY) {
return ByteArrayCodec.INSTANCE.getValueEncoder();
if (inParamType.get(typeIndex) == ValueType.OBJECT) {
return msg.getCodec().getValueEncoder();
}
if (inParamType.get(typeIndex) == ValueType.STRING) {
return StringCodec.INSTANCE.getValueEncoder();
}
throw new IllegalStateException();
}

@ -24,7 +24,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
public class RedisCommand<R> {
public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY}
public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING}
private ValueType outParamType = ValueType.OBJECT;
private List<ValueType> inParamType = Arrays.asList(ValueType.OBJECT);

@ -15,10 +15,11 @@
*/
package org.redisson.client.protocol;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitSetReplayConvertor;
@ -40,7 +41,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder2;
import org.redisson.client.protocol.decoder.FlatNestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
@ -57,6 +58,12 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands {
RedisStrictCommand<Long> GEOADD = new RedisStrictCommand<Long>("GEOADD", 4);
RedisStrictCommand<Long> GEOADD_ENTRIES = new RedisStrictCommand<Long>("GEOADD", 2, ValueType.OBJECTS);
RedisCommand<Double> GEODIST = new RedisCommand<Double>("GEODIST", new DoubleReplayConvertor(), 2, Arrays.asList(ValueType.OBJECT, ValueType.OBJECT, ValueType.STRING));
RedisCommand<List<Object>> GEORADIUS = new RedisCommand<List<Object>>("GEORADIUS", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> GEORADIUSBYMEMBER = new RedisCommand<List<Object>>("GEORADIUSBYMEMBER", new ObjectListReplayDecoder<Object>(), 2);
RedisStrictCommand<Integer> KEYSLOT = new RedisStrictCommand<Integer>("CLUSTER", "KEYSLOT", new IntegerReplayConvertor());
RedisStrictCommand<Boolean> GETBIT = new RedisStrictCommand<Boolean>("GETBIT", new BooleanReplayConvertor());
@ -238,7 +245,7 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES",
new NestedMultiDecoder2(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT
new FlatNestedMultiDecoder(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT
);
RedisStrictCommand<String> INFO_REPLICATION = new RedisStrictCommand<String>("INFO", "replication", new StringDataDecoder());

@ -19,7 +19,7 @@ public class DoubleReplayConvertor extends SingleConvertor<Double> {
@Override
public Double convert(Object obj) {
if (obj == null) {
if (obj == null || obj.toString().isEmpty()) {
return null;
}
return Double.valueOf(obj.toString());

@ -0,0 +1,44 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class FlatNestedMultiDecoder<T> extends NestedMultiDecoder {
public FlatNestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
super(firstDecoder, secondDecoder);
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return firstDecoder.decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.resetDecoderIndex();
}
return firstDecoder.isApplicable(paramNum, state);
}
}

@ -0,0 +1,58 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class GeoDistanceDecoder implements MultiDecoder<List<Object>> {
private final ThreadLocal<Integer> pos = new ThreadLocal<Integer>();
private final Codec codec;
public GeoDistanceDecoder(Codec codec) {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
if (pos.get() % 2 == 0) {
return codec.getValueDecoder().decode(buf, state);
}
return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
pos.set(paramNum);
return true;
}
@Override
public List<Object> decode(List<Object> parts, State state) {
return parts;
}
}

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class GeoDistanceMapDecoder implements MultiDecoder<Map<Object, Object>> {
private final ThreadLocal<Integer> pos = new ThreadLocal<Integer>();
private final Codec codec;
public GeoDistanceMapDecoder(Codec codec) {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
System.out.println("1 " + buf.toString(CharsetUtil.UTF_8));
if (pos.get() % 2 == 0) {
return codec.getValueDecoder().decode(buf, state);
}
return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
pos.set(paramNum);
return true;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
System.out.println(parts);
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i));
}
}
return result;
}
}

@ -0,0 +1,48 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class GeoMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new HashMap<Object, Object>(parts.size());
for (Object object : parts) {
List<Object> vals = ((List<Object>) object);
result.put(vals.get(0), vals.get(1));
}
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,50 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.core.GeoPosition;
import io.netty.buffer.ByteBuf;
public class GeoPositionDecoder implements MultiDecoder<GeoPosition> {
@Override
public Double decode(ByteBuf buf, State state) throws IOException {
return (Double) DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
@Override
public GeoPosition decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
Double longitude = Double.valueOf(parts.get(0).toString());
Double latitude = Double.valueOf(parts.get(1).toString());
return new GeoPosition(longitude, latitude);
}
}

@ -0,0 +1,62 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class GeoPositionMapDecoder implements MultiDecoder<Map<Object, Object>> {
private final List<Object> args;
public GeoPositionMapDecoder(List<Object> args) {
this.args = args;
}
@Override
public Double decode(ByteBuf buf, State state) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return Collections.emptyMap();
}
Map<Object, Object> result = new HashMap<Object, Object>(parts.size());
for (int index = 0; index < args.size()-1; index++) {
Object value = parts.get(index);
if (value == null || value == Collections.emptyMap()) {
continue;
}
result.put(args.get(index+1), value);
}
return result;
}
}

@ -16,9 +16,6 @@
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import org.redisson.client.handler.State;
@ -29,53 +26,94 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
public static class DecoderState {
Deque<MultiDecoder<?>> decoders;
int decoderIndex;
int flipDecoderIndex;
Deque<MultiDecoder<?>> flipDecoders;
public DecoderState(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
super();
this.decoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
this.flipDecoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
public DecoderState() {
}
public Deque<MultiDecoder<?>> getDecoders() {
return decoders;
public int getDecoderIndex() {
return decoderIndex;
}
public Deque<MultiDecoder<?>> getFlipDecoders() {
return flipDecoders;
public void resetDecoderIndex() {
decoderIndex = 0;
}
public void incDecoderIndex() {
decoderIndex++;
}
public int getFlipDecoderIndex() {
return flipDecoderIndex;
}
public void resetFlipDecoderIndex() {
flipDecoderIndex = 0;
}
public void incFlipDecoderIndex() {
flipDecoderIndex++;
}
}
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
protected final MultiDecoder<Object> firstDecoder;
protected final MultiDecoder<Object> secondDecoder;
private MultiDecoder<Object> thirdDecoder;
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, MultiDecoder<Object> thirdDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
this.thirdDecoder = thirdDecoder;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
DecoderState ds = getDecoder(state);
return ds.getFlipDecoders().peek().decode(buf, state);
MultiDecoder<?> decoder = null;
if (ds.getFlipDecoderIndex() == 2) {
decoder = firstDecoder;
}
if (ds.getFlipDecoderIndex() == 1) {
decoder = secondDecoder;
}
return decoder.decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.getFlipDecoders().poll();
ds.incFlipDecoderIndex();
ds.resetDecoderIndex();
}
// used only with thirdDecoder
if (ds.getFlipDecoderIndex() == 3) {
ds.resetFlipDecoderIndex();
ds.incFlipDecoderIndex();
}
MultiDecoder<?> decoder = null;
if (ds.getFlipDecoderIndex() == 2) {
decoder = firstDecoder;
}
if (ds.getFlipDecoderIndex() == 1) {
decoder = secondDecoder;
}
return ds.getFlipDecoders().peek().isApplicable(paramNum, state);
return decoder.isApplicable(paramNum, state);
}
private DecoderState getDecoder(State state) {
protected final DecoderState getDecoder(State state) {
DecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new DecoderState(firstDecoder, secondDecoder);
ds = new DecoderState();
state.setDecoderState(ds);
}
return ds;
@ -83,8 +121,32 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty() && state.getDecoderState() == null) {
MultiDecoder<?> decoder = secondDecoder;
if (thirdDecoder != null) {
decoder = thirdDecoder;
}
return decoder.decode(parts, state);
}
DecoderState ds = getDecoder(state);
return ds.getDecoders().poll().decode(parts, state);
if (parts.isEmpty()) {
ds.resetDecoderIndex();
}
ds.incDecoderIndex();
MultiDecoder<?> decoder = null;
if (ds.getDecoderIndex() == 1) {
decoder = firstDecoder;
}
if (ds.getDecoderIndex() == 2) {
decoder = secondDecoder;
}
if (ds.getDecoderIndex() == 3) {
decoder = thirdDecoder;
}
return decoder.decode(parts, state);
}
}

@ -1,86 +0,0 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class NestedMultiDecoder2<T> implements MultiDecoder<Object> {
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
public NestedMultiDecoder2(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return firstDecoder.decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
if (paramNum == 0) {
setCounter(state, 0);
}
return firstDecoder.isApplicable(paramNum, state);
}
private Integer getCounter(State state) {
Integer value = state.getDecoderState();
if (value == null) {
return 0;
}
return value;
}
private void setCounter(State state, Integer value) {
state.setDecoderState(value);
}
@Override
public Object decode(List<Object> parts, State state) {
// handle empty result
if (parts.isEmpty() && state.getDecoderState() == null) {
return secondDecoder.decode(parts, state);
}
int counter = getCounter(state);
if (counter == 2) {
counter = 0;
}
counter++;
setCounter(state, counter);
MultiDecoder<?> decoder = null;
if (counter == 1) {
decoder = firstDecoder;
}
if (counter == 2) {
decoder = secondDecoder;
}
return decoder.decode(parts, state);
}
}

@ -16,6 +16,7 @@
package org.redisson.connection.decoder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -45,6 +46,9 @@ public class MapGetAllDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return Collections.emptyMap();
}
Map<Object, Object> result = new HashMap<Object, Object>(parts.size());
for (int index = 0; index < args.size()-1; index++) {
Object value = parts.get(index);

@ -0,0 +1,43 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public class GeoEntry {
private final double longitude;
private final double latitude;
private final Object member;
public GeoEntry(double longitude, double latitude, Object member) {
super();
this.longitude = longitude;
this.latitude = latitude;
this.member = member;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public Object getMember() {
return member;
}
}

@ -0,0 +1,70 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public class GeoPosition {
private final double longitude;
private final double latitude;
public GeoPosition(double longitude, double latitude) {
super();
this.longitude = longitude;
this.latitude = latitude;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
long temp;
temp = Double.doubleToLongBits(latitude);
result = prime * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(longitude);
result = prime * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
GeoPosition other = (GeoPosition) obj;
if (Double.doubleToLongBits(latitude) != Double.doubleToLongBits(other.latitude))
return false;
if (Double.doubleToLongBits(longitude) != Double.doubleToLongBits(other.longitude))
return false;
return true;
}
@Override
public String toString() {
return "GeoPosition [longitude=" + longitude + ", latitude=" + latitude + "]";
}
}

@ -0,0 +1,48 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public enum GeoUnit {
METERS {
@Override
public String toString() {
return "m";
}
},
KILOMETERS {
@Override
public String toString() {
return "km";
}
},
MILES {
@Override
public String toString() {
return "mi";
}
},
FEET {
@Override
public String toString() {
return "ft";
}
}
}

@ -0,0 +1,51 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
* @param <V>
*/
public interface RGeo<V> extends RExpirable, RGeoAsync<V> {
long add(double longitude, double latitude, V member);
long add(GeoEntry... entries);
Double dist(V firstMember, V secondMember, GeoUnit geoUnit);
Map<V, String> hash(V... members);
Map<V, GeoPosition> pos(V... members);
List<V> radius(double longitude, double latitude, double radius, GeoUnit geoUnit);
Map<V, Double> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit);
Map<V, GeoPosition> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit);
List<V> radius(V member, double radius, GeoUnit geoUnit);
Map<V, Double> radiusWithDistance(V member, double radius, GeoUnit geoUnit);
Map<V, GeoPosition> radiusWithPosition(V member, double radius, GeoUnit geoUnit);
}

@ -0,0 +1,53 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import java.util.Map;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
* @param <V>
*/
public interface RGeoAsync<V> extends RExpirableAsync {
Future<Long> addAsync(double longitude, double latitude, V member);
Future<Long> addAsync(GeoEntry... entries);
Future<Double> distAsync(V firstMember, V secondMember, GeoUnit geoUnit);
Future<Map<V, String>> hashAsync(V... members);
Future<Map<V, GeoPosition>> posAsync(V... members);
Future<List<V>> radiusAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
Future<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
Future<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
Future<List<V>> radiusAsync(V member, double radius, GeoUnit geoUnit);
Future<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit);
Future<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit);
}
Loading…
Cancel
Save