Merge branch 'master' into 3.0.0

pull/802/merge
Nikita 9 years ago
commit b58b014bf6

@ -438,8 +438,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] msgEncoded = encode(new LocalCachedMapClear());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 and ARGV[2] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end; ",
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end; "
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
msgEncoded, invalidateEntryOnChange);
}

@ -688,7 +688,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// lock acquired
if (ttl == null) {
unsubscribe(subscribeFuture, currentThreadId);
result.trySuccess(true);
if (!result.trySuccess(true)) {
unlockAsync(currentThreadId);
}
return;
}

@ -20,8 +20,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@ -196,36 +196,12 @@ public class RedissonMultiLock implements Lock {
@Override
public boolean tryLock() {
Map<RLock, RFuture<Boolean>> tryLockFutures = new HashMap<RLock, RFuture<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.put(lock, lock.tryLockAsync());
}
return sync(tryLockFutures);
}
protected boolean sync(Map<RLock, RFuture<Boolean>> tryLockFutures) {
List<RLock> lockedLocks = new ArrayList<RLock>(tryLockFutures.size());
RuntimeException latestException = null;
for (Entry<RLock, RFuture<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}
if (lockedLocks.size() < tryLockFutures.size()) {
unlockInner(lockedLocks);
if (latestException != null) {
throw latestException;
}
try {
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
protected void unlockInner(Collection<RLock> locks) {
@ -243,14 +219,71 @@ public class RedissonMultiLock implements Lock {
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
protected int failedLocksLimit() {
return 0;
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
Map<RLock, RFuture<Boolean>> tryLockFutures = new HashMap<RLock, RFuture<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit));
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = waitTime*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
int failedLocksLimit = failedLocksLimit();
List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit);
}
if (lockAcquired) {
lockedLocks.add(lock);
} else {
if (failedLocksLimit == 0) {
unlockInner(lockedLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
lockedLocks.clear();
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime < 0) {
unlockInner(lockedLocks);
return false;
}
}
}
return sync(tryLockFutures);
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(lockedLocks.size());
for (RLock rLock : lockedLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}

@ -15,14 +15,10 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import io.netty.util.concurrent.Future;
@ -47,31 +43,12 @@ public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}
protected boolean sync(Map<RLock, RFuture<Boolean>> tryLockFutures) {
List<RLock> lockedLocks = new ArrayList<RLock>(tryLockFutures.size());
RuntimeException latestException = null;
for (Entry<RLock, RFuture<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}
if (lockedLocks.size() < minLocksAmount(locks)) {
unlockInner(lockedLocks);
if (latestException != null) {
throw latestException;
}
return false;
}
return true;
}
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}

@ -28,6 +28,7 @@ import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
@ -239,8 +240,8 @@ public interface RedisCommands {
RedisStrictCommand<Long> DEL = new RedisStrictCommand<Long>("DEL");
RedisStrictCommand<Long> DBSIZE = new RedisStrictCommand<Long>("DBSIZE");
RedisStrictCommand<Boolean> DEL_BOOL = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> DEL_OBJECTS = new RedisStrictCommand<Boolean>("DEL", new BooleanAmountReplayConvertor());
RedisStrictCommand<Boolean> DEL_BOOL = new RedisStrictCommand<Boolean>("DEL", new BooleanNullSafeReplayConvertor());
RedisStrictCommand<Boolean> DEL_OBJECTS = new RedisStrictCommand<Boolean>("DEL", new BooleanNullSafeReplayConvertor());
RedisStrictCommand<Void> DEL_VOID = new RedisStrictCommand<Void>("DEL", new VoidReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class BooleanNullSafeReplayConvertor extends SingleConvertor<Boolean> {
@Override
public Boolean convert(Object obj) {
if (obj == null) {
return false;
}
return Long.valueOf(1).equals(obj) || "OK".equals(obj);
}
}

@ -16,6 +16,7 @@
package org.redisson.client.protocol.decoder;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -34,7 +35,7 @@ public class ObjectMapEntryReplayDecoder implements MultiDecoder<Set<Entry<Objec
@Override
public Set<Entry<Object, Object>> decode(List<Object> parts, State state) {
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i));

@ -15,7 +15,7 @@
*/
package org.redisson.client.protocol.decoder;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -32,7 +32,7 @@ public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>>
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i));

@ -16,6 +16,7 @@
package org.redisson.client.protocol.decoder;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@ -32,7 +33,7 @@ public class ObjectSetReplayDecoder<T> implements MultiDecoder<Set<T>> {
@Override
public Set<T> decode(List<Object> parts, State state) {
return new HashSet(parts);
return new LinkedHashSet(parts);
}
@Override

@ -76,7 +76,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg);
init(this.config);
Exception lastException = null;
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
@ -97,6 +98,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
failedMasters.add(partition.getMasterAddr().toString());
continue;
}
RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg);
@ -106,6 +108,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RFuture<Collection<RFuture<Void>>> masterFuture : futures) {
masterFuture.awaitUninterruptibly();
if (!masterFuture.isSuccess()) {
lastException = masterFuture.cause();
continue;
}
for (RFuture<Void> future : masterFuture.getNow()) {
@ -124,12 +127,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (lastPartitions.isEmpty()) {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!", lastException);
if (failedMasters.isEmpty()) {
throw new RedisConnectionException("Can't connect to servers!", lastException);
} else {
throw new RedisConnectionException("Can't connect to servers! Failed masters according to cluster status: " + failedMasters, lastException);
}
}
if (lastPartitions.size() != MAX_SLOT) {
stopThreads();
throw new RedisConnectionException("Not all slots are covered! Only " + lastPartitions.size() + " slots are avaliable", lastException);
if (failedMasters.isEmpty()) {
throw new RedisConnectionException("Not all slots are covered! Only " + lastPartitions.size() + " slots are avaliable", lastException);
} else {
throw new RedisConnectionException("Not all slots are covered! Only " + lastPartitions.size() + " slots are avaliable. Failed masters according to cluster status: " + failedMasters, lastException);
}
}
scheduleClusterChangeCheck(cfg, null);

@ -32,20 +32,24 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
if (message.equals(unlockMessage)) {
value.getLatch().release();
synchronized (value) {
while (true) {
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnable.run();
runnableToExecute = runnable;
} else {
value.addListener(runnable);
return;
}
} else {
return;
}
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
}
}
}

@ -29,20 +29,24 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
protected void onMessage(RedissonLockEntry value, Long message) {
value.getLatch().release(message.intValue());
synchronized (value) {
while (true) {
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnable.run();
runnableToExecute = runnable;
} else {
value.addListener(runnable);
return;
}
} else {
return;
}
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
}
}

@ -18,6 +18,7 @@ import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.misc.Cache;
import mockit.Deencapsulation;
@ -46,6 +47,25 @@ public class RedissonLocalCachedMapTest extends BaseTest {
}
@Test
public void testClearEmpty() {
RLocalCachedMap<Object, Object> localCachedMap = redisson.getLocalCachedMap("udi-test",
LocalCachedMapOptions.defaults());
localCachedMap.clear();
}
@Test
public void testDelete() {
RLocalCachedMap<String, String> localCachedMap = redisson.getLocalCachedMap("udi-test",
LocalCachedMapOptions.defaults());
assertThat(localCachedMap.delete()).isFalse();
localCachedMap.put("1", "2");
assertThat(localCachedMap.delete()).isTrue();
}
@Test
public void testInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);

@ -4,11 +4,14 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
@ -346,6 +349,37 @@ public class RedissonMapTest extends BaseTest {
assertThat(map.size()).isEqualTo(1);
}
@Test
public void testOrdering() {
Map<String, String> map = new LinkedHashMap<String, String>();
// General player data
map.put("name", "123");
map.put("ip", "4124");
map.put("rank", "none");
map.put("tokens", "0");
map.put("coins", "0");
// Arsenal player statistics
map.put("ar_score", "0");
map.put("ar_gameswon", "0");
map.put("ar_gameslost", "0");
map.put("ar_kills", "0");
map.put("ar_deaths", "0");
RMap<String, String> rmap = redisson.getMap("123");
rmap.putAll(map);
assertThat(rmap.keySet()).containsExactlyElementsOf(map.keySet());
assertThat(rmap.readAllKeySet()).containsExactlyElementsOf(map.keySet());
assertThat(rmap.values()).containsExactlyElementsOf(map.values());
assertThat(rmap.readAllValues()).containsExactlyElementsOf(map.values());
assertThat(rmap.entrySet()).containsExactlyElementsOf(map.entrySet());
assertThat(rmap.readAllEntrySet()).containsExactlyElementsOf(map.entrySet());
}
@Test
public void testPutAll() {
Map<Integer, String> map = redisson.getMap("simple");

@ -1,8 +1,11 @@
package org.redisson;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
@ -19,6 +22,50 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRedLockTest {
@Test
public void testTryLockLeasetime() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();
RedisProcess redis2 = redisTestMultilockInstance();
RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort());
RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort());
RLock lock1 = client1.getLock("lock1");
RLock lock2 = client1.getLock("lock2");
RLock lock3 = client2.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
try {
if (lock.tryLock(4, 2, TimeUnit.SECONDS)) {
int nextValue = counter.get() + 1;
Thread.sleep(1000);
counter.set(nextValue);
lock.unlock();
} else {
j--;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue();
assertThat(counter.get()).isEqualTo(50);
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
}
@Test
public void testLockFailed() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();

@ -35,8 +35,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
BaseTest.beforeClass();
if (!RedissonRuntimeEnvironment.isTravis) {
BaseTest.beforeClass();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
@ -47,17 +47,17 @@ public class RedissonExecutorServiceTest extends BaseTest {
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
BaseTest.afterClass();
if (!RedissonRuntimeEnvironment.isTravis) {
BaseTest.afterClass();
node.shutdown();
}
}
@Before
@Override
public void before() throws IOException, InterruptedException {
super.before();
if (RedissonRuntimeEnvironment.isTravis) {
super.before();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
@ -69,8 +69,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
@After
@Override
public void after() throws InterruptedException {
super.after();
if (RedissonRuntimeEnvironment.isTravis) {
super.after();
node.shutdown();
}
}

@ -50,23 +50,19 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
@Before
@Override
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
super.before();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
}
super.before();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
}
@After
@Override
public void after() throws InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
super.after();
node.shutdown();
}
super.after();
node.shutdown();
}
@Test

Loading…
Cancel
Save