RedissonSortedSet modification with consistency. ConcurrentRedissonSortedSetTest added

pull/6/head
Nikita 11 years ago
parent e42c1a1369
commit 85502a83c2

@ -1,21 +1,33 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.URL;
import java.net.URLConnection;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSortedSet;
@ -29,7 +41,6 @@ import com.lambdaworks.redis.ScoredValue;
*
* @param <V>
*/
// TODO lock up-down scores during adding an element
public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V> {
private static class NaturalComparator<V> implements Comparator<V>, Serializable {
@ -46,6 +57,33 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
public static class NewScore {
private Double leftScore;
private Double rightScore;
private Double score;
public NewScore(Double leftScore, Double rightScore, Double score) {
super();
this.leftScore = leftScore;
this.rightScore = rightScore;
this.score = score;
}
public Double getLeftScore() {
return leftScore;
}
public Double getRightScore() {
return rightScore;
}
public Double getScore() {
return score;
}
}
public static class BinarySearchResult<V> {
private V value;
@ -86,6 +124,10 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
this.connectionManager = connectionManager;
loadComparator();
RedisConnection<Object, Object> conn = connectionManager.connection();
conn.setnx(getCurrentVersionKey(), 0L);
connectionManager.release(conn);
}
private void loadComparator() {
@ -271,25 +313,83 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
}
private String getCurrentVersionKey() {
return "redisson__sortedset__version__" + getName();
}
private Long getCurrentVersion(RedisConnection<Object, Object> simpleConnection) {
return ((Number)simpleConnection.get(getCurrentVersionKey())).longValue();
}
@Override
public boolean add(V value) {
RedisConnection<Object, V> connection = connectionManager.connection();
RedisConnection<Object, Object> simpleConnection = (RedisConnection<Object, Object>)connection;
try {
while (true) {
connection.watch(getComparatorKeyName());
checkComparator(connection);
Long version = getCurrentVersion(simpleConnection);
BinarySearchResult<V> res = binarySearch(value, connection);
if (res.getIndex() < 0) {
double score = calcNewScore(res.getIndex(), connection);
if (!version.equals(getCurrentVersion(simpleConnection))) {
connection.unwatch();
continue;
}
NewScore newScore = calcNewScore(res.getIndex(), connection);
if (!version.equals(getCurrentVersion(simpleConnection))) {
connection.unwatch();
continue;
}
String leftScoreKey = getScoreKeyName(newScore.getLeftScore());
String rightScoreKey = getScoreKeyName(newScore.getRightScore());
if (simpleConnection.setnx(leftScoreKey, 1)) {
if (!version.equals(getCurrentVersion(simpleConnection))) {
connection.unwatch();
connection.del(leftScoreKey);
continue;
}
if (rightScoreKey != null) {
if (!simpleConnection.setnx(rightScoreKey, 1)) {
connection.unwatch();
connection.del(leftScoreKey);
continue;
}
}
} else {
connection.unwatch();
continue;
}
connection.multi();
connection.zadd(getName(), score, value);
connection.zadd(getName(), newScore.getScore(), value);
if (rightScoreKey != null) {
connection.del(leftScoreKey, rightScoreKey);
} else {
connection.del(leftScoreKey);
}
connection.incr(getCurrentVersionKey());
List<Object> re = connection.exec();
if (re.size() == 1) {
Object val = re.iterator().next();
return val != null && ((Number)val).intValue() > 0;
if (re.size() == 3) {
Number val = (Number) re.get(0);
Long delCount = (Long) re.get(1);
if (rightScoreKey != null) {
if (delCount != 2) {
throw new IllegalStateException();
}
} else {
if (delCount != 1) {
throw new IllegalStateException();
}
}
return val != null && val.intValue() > 0;
} else {
checkComparator(connection);
}
@ -303,7 +403,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
}
private void checkComparator(RedisConnection<Object, V> connection) {
private void checkComparator(RedisConnection<Object, ?> connection) {
String comparatorSign = (String) connection.get(getComparatorKeyName());
if (comparatorSign != null) {
String[] vals = comparatorSign.split(":");
@ -327,28 +427,33 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
* @param connection
* @return score for index
*/
public double calcNewScore(int index, RedisConnection<Object, V> connection) {
public NewScore calcNewScore(int index, RedisConnection<Object, V> connection) {
if (index >= 0) {
throw new IllegalStateException("index should be negative, but value is " + index);
}
index = -(index + 1);
Double leftScore = null;
Double rightScore = null;
double score = getScoreAtIndex(index, connection);
if (index == 0) {
if (score < 0) {
score = 1000000;
score = (double) 1000000;
leftScore = score;
} else {
score /= 2;
leftScore = score;
score = score / 2;
}
} else {
double beginScore = getScoreAtIndex(index-1, connection);
leftScore = getScoreAtIndex(index-1, connection);
if (score < 0) {
score = beginScore + 1000000;
score = leftScore + 1000000;
} else {
score = beginScore + (score - beginScore) / 2;
rightScore = score;
score = leftScore + (rightScore - leftScore) / 2;
}
}
return score;
return new NewScore(leftScore, rightScore, score);
}
@Override
@ -359,7 +464,15 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
if (res.getIndex() < 0) {
return false;
}
return connection.zremrangebyscore(getName(), res.getScore(), res.getScore()) > 0;
connection.multi();
connection.zremrangebyscore(getName(), res.getScore(), res.getScore());
connection.incr(getCurrentVersionKey());
List<Object> result = connection.exec();
if (result.size() == 2) {
return ((Number)result.get(0)).longValue() > 0;
} else {
return false;
}
} finally {
connectionManager.release(connection);
}
@ -421,8 +534,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public Comparator<? super V> comparator() {
// TODO Auto-generated method stub
return null;
return comparator;
}
@Override
@ -468,6 +580,13 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
}
private String getScoreKeyName(Double score) {
if (score == null) {
return null;
}
return "redisson__sortedset__score__" + getName() + "__" + score;
}
private String getComparatorKeyName() {
return "redisson__sortedset__comparator__" + getName();
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Collection;
@ -8,6 +23,7 @@ import java.util.NoSuchElementException;
import java.util.SortedSet;
import org.redisson.RedissonSortedSet.BinarySearchResult;
import org.redisson.RedissonSortedSet.NewScore;
import org.redisson.connection.ConnectionManager;
import com.lambdaworks.redis.RedisConnection;
@ -127,8 +143,8 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
BinarySearchResult<V> res = redissonSortedSet.binarySearch(e, connection);
if (res.getScore() == null) {
double score = redissonSortedSet.calcNewScore(res.getIndex(), connection);
if (score < tailScore && score > headScore) {
NewScore score = redissonSortedSet.calcNewScore(res.getIndex(), connection);
if (score.getScore() < tailScore && score.getScore() > headScore) {
return redissonSortedSet.add(e);
} else {
throw new IllegalArgumentException("value out of range");
@ -244,12 +260,13 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
public V first() {
RedisConnection<Object, V> connection = connectionManager.connection();
try {
// TODO compare first value with headValue
if (headValue != null) {
BinarySearchResult<V> res = redissonSortedSet.binarySearch(headValue, connection);
if (res.getIndex() < 0) {
double headScore = redissonSortedSet.calcNewScore(res.getIndex(), connection);
NewScore headScore = redissonSortedSet.calcNewScore(res.getIndex(), connection);
double tailScore = getTailScore(connection);
List<V> vals = connection.zrangebyscore(redissonSortedSet.getName(), headScore, tailScore);
List<V> vals = connection.zrangebyscore(redissonSortedSet.getName(), headScore.getScore(), tailScore);
if (vals.isEmpty()) {
throw new NoSuchElementException();
}
@ -267,12 +284,13 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
public V last() {
RedisConnection<Object, V> connection = connectionManager.connection();
try {
// TODO compare last value with headValue
if (tailValue != null) {
BinarySearchResult<V> res = redissonSortedSet.binarySearch(tailValue, connection);
if (res.getIndex() < 0) {
double tailScore = redissonSortedSet.calcNewScore(res.getIndex(), connection);
NewScore tailScore = redissonSortedSet.calcNewScore(res.getIndex(), connection);
double headScore = getHeadScore(connection);
List<V> vals = connection.zrangebyscore(redissonSortedSet.getName(), headScore, tailScore);
List<V> vals = connection.zrangebyscore(redissonSortedSet.getName(), headScore, tailScore.getScore());
if (vals.isEmpty()) {
throw new NoSuchElementException();
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Comparator;

@ -0,0 +1,42 @@
package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RSortedSet;
public class ConcurrentRedissonSortedSetTest extends BaseConcurrentTest {
@Test
public void testAdd_SingleInstance() throws InterruptedException {
final String name = "testSingleReplaceOldValue_SingleInstance";
Redisson r = Redisson.create();
RSortedSet<Integer> map = r.getSortedSet(name);
map.clear();
int length = 1000;
final AtomicInteger counter = new AtomicInteger();
testSingleInstanceConcurrency(length, new RedissonRunnable() {
@Override
public void run(Redisson redisson) {
RSortedSet<Integer> set = redisson.getSortedSet(name);
int c = counter.incrementAndGet();
Assert.assertTrue(set.add(c));
}
});
List<Integer> elements = new ArrayList<Integer>();
for (int i = 1; i < length+1; i++) {
elements.add(i);
}
MatcherAssert.assertThat(map, Matchers.contains(elements.toArray(new Integer[elements.size()])));
r.shutdown();
}
}

@ -35,11 +35,11 @@ public class RedissonSortedSetTest extends BaseTest {
boolean setRes = set.trySetComparator(Collections.reverseOrder());
Assert.assertTrue(setRes);
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
Assert.assertTrue(set.add(1));
Assert.assertTrue(set.add(2));
Assert.assertTrue(set.add(3));
Assert.assertTrue(set.add(4));
Assert.assertTrue(set.add(5));
MatcherAssert.assertThat(set, Matchers.contains(5, 4, 3, 2, 1));
boolean setRes2 = set.trySetComparator(Collections.reverseOrder(Collections.reverseOrder()));
@ -264,11 +264,11 @@ public class RedissonSortedSetTest extends BaseTest {
public void testDuplicates() {
Set<TestObject> set = redisson.getSortedSet("set");
set.add(new TestObject("1", "2"));
set.add(new TestObject("1", "2"));
set.add(new TestObject("2", "3"));
set.add(new TestObject("3", "4"));
set.add(new TestObject("5", "6"));
Assert.assertTrue(set.add(new TestObject("1", "2")));
Assert.assertFalse(set.add(new TestObject("1", "2")));
Assert.assertTrue(set.add(new TestObject("2", "3")));
Assert.assertTrue(set.add(new TestObject("3", "4")));
Assert.assertTrue(set.add(new TestObject("5", "6")));
Assert.assertEquals(4, set.size());
}

Loading…
Cancel
Save