new RScoredSortedSet object added. #143

pull/243/head
Nikita 10 years ago
parent e0814eb73f
commit af773c3713

@ -44,6 +44,7 @@ import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
import org.redisson.core.RSortedSet;
@ -216,6 +217,10 @@ public class Redisson implements RedissonClient {
return new RedissonSortedSet<V>(commandExecutor, name);
}
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(commandExecutor, name);
}
/**
* Returns topic instance by name.
*

@ -0,0 +1,267 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScoredEntry;
import org.redisson.core.RScoredSortedSet;
import io.netty.util.concurrent.Future;
public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RScoredSortedSet<V> {
public RedissonScoredSortedSet(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override
public boolean add(double score, V object) {
return get(addAsync(score, object));
}
@Override
public Future<Boolean> addAsync(double score, V object) {
return commandExecutor.writeAsync(getName(), RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object);
}
@Override
public boolean remove(Object object) {
return get(removeAsync(object));
}
@Override
public void clear() {
delete();
}
@Override
public Future<Boolean> removeAsync(Object object) {
return commandExecutor.writeAsync(getName(), RedisCommands.ZREM, getName(), object);
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.ZCARD, getName());
}
@Override
public boolean contains(Object object) {
return get(containsAsync(object));
}
@Override
public Future<Boolean> containsAsync(Object o) {
return commandExecutor.readAsync(getName(), RedisCommands.ZSCORE_CONTAINS, getName(), o);
}
@Override
public Double getScore(Object o) {
return get(getScoreAsync(o));
}
@Override
public Future<Double> getScoreAsync(Object o) {
return commandExecutor.readAsync(getName(), RedisCommands.ZSCORE, getName(), o);
}
private ListScanResult<V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private Iterator<V> iter;
private RedisClient client;
private Long iterPos;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null) {
ListScanResult<V> res = scanIterator(null, 0);
client = res.getRedisClient();
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<V> res = scanIterator(client, iterPos);
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
iter.remove();
RedissonScoredSortedSet.this.remove(value);
removeExecuted = true;
}
};
}
@Override
public Object[] toArray() {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));
return res.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('zrange', KEYS[1], 0, -1);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('zrem', KEYS[1], element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Double addScore(V object, Number value) {
return get(addScoreAsync(object, value));
}
@Override
public Future<Double> addScoreAsync(V object, Number value) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZINCRBY,
getName(), new BigDecimal(value.toString()).toPlainString(), object);
}
@Override
public Collection<V> valueRange(int startIndex, int endIndex) {
return get(valueRangeAsync(startIndex, endIndex));
}
@Override
public Future<Collection<V>> valueRangeAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), RedisCommands.ZRANGE, getName(), startIndex, endIndex);
}
@Override
public Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex) {
return get(entryRangeAsync(startIndex, endIndex));
}
@Override
public Future<Collection<ScoredEntry<V>>> entryRangeAsync(int startIndex,
int endIndex) {
return commandExecutor.readAsync(getName(), RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES");
}
}

@ -21,7 +21,9 @@ import java.util.Set;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
@ -35,6 +37,9 @@ import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredEntry;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
@ -44,6 +49,16 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands {
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);
RedisStrictCommand<Integer> ZCARD = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisCommand<Boolean> ZSCORE_CONTAINS = new RedisCommand<Boolean>("ZSCORE", new BooleanNotNullReplayConvertor(), 2);
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()), ValueType.OBJECT);
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder());
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING");

@ -0,0 +1,26 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class BooleanNotNullReplayConvertor extends SingleConvertor<Boolean> {
@Override
public Boolean convert(Object obj) {
return obj != null;
}
}

@ -0,0 +1,29 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class DoubleReplayConvertor extends SingleConvertor<Double> {
@Override
public Double convert(Object obj) {
if (obj == null) {
return null;
}
return Double.valueOf(obj.toString());
}
}

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
public class ScoredEntry<V> {
private final Double score;
private final V value;
public ScoredEntry(Double score, V value) {
super();
this.score = score;
this.value = value;
}
public V getValue() {
return value;
}
public Double getScore() {
return score;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((score == null) ? 0 : score.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ScoredEntry other = (ScoredEntry) obj;
if (score == null) {
if (other.score != null)
return false;
} else if (!score.equals(other.score))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ScoredSortedSetReplayDecoder<T> implements MultiDecoder<List<ScoredEntry<T>>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public List<ScoredEntry<T>> decode(List<Object> parts, State state) {
List<ScoredEntry<T>> result = new ArrayList<ScoredEntry<T>>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new ScoredEntry<T>(((Number)parts.get(i+1)).doubleValue(), (T)parts.get(i)));
}
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class ScoredSortedSetScanReplayDecoder implements MultiDecoder<ListScanResult<Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public ListScanResult<Object> decode(List<Object> parts, State state) {
List<Object> values = (List<Object>)parts.get(1);
for (int i = 1; i < values.size(); i++) {
values.remove(i);
}
return new ListScanResult<Object>((Long)parts.get(0), values);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -0,0 +1,54 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.redisson.client.protocol.decoder.ScoredEntry;
public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable {
Double getScore(Object o);
boolean add(double score, V object);
int size();
boolean isEmpty();
boolean contains(Object o);
Object[] toArray();
<T> T[] toArray(T[] a);
boolean remove(Object o);
boolean containsAll(Collection<?> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();
Double addScore(V object, Number value);
Collection<V> valueRange(int startIndex, int endIndex);
Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex);
}

@ -0,0 +1,48 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.redisson.client.protocol.decoder.ScoredEntry;
import io.netty.util.concurrent.Future;
public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
Future<Double> getScoreAsync(Object o);
Future<Boolean> addAsync(double score, V object);
Future<Boolean> removeAsync(V object);
Future<Integer> sizeAsync();
Future<Boolean> containsAsync(Object o);
Future<Boolean> containsAllAsync(Collection<?> c);
Future<Boolean> removeAllAsync(Collection<?> c);
Future<Boolean> retainAllAsync(Collection<?> c);
Future<Double> addScoreAsync(V object, Number value);
Future<Collection<V>> valueRangeAsync(int startIndex, int endIndex);
Future<Collection<ScoredEntry<V>>> entryRangeAsync(int startIndex, int endIndex);
}
Loading…
Cancel
Save