Merge branch 'redisson/master' into remote-service-method-override

pull/766/head
Rui Gu 8 years ago
commit 5f628d1922

@ -415,24 +415,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
Map<CacheKey, CacheValue> cacheMap = new HashMap<CacheKey, CacheValue>(m.size());
for (java.util.Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue());
cacheMap.put(cacheKey, cacheValue);
}
cache.putAll(cacheMap);
super.putAll(m);
if (invalidateEntryOnChange == 1) {
for (CacheKey cacheKey : cacheMap.keySet()) {
invalidationTopic.publish(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
}
}
}
@Override
public RFuture<Boolean> deleteAsync() {
cache.clear();
@ -754,6 +736,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
params.addAll(msgs);
final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
@ -771,9 +754,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
cacheMap(map);
result.trySuccess(null);
}
});
return future;
return result;
}
@Override

@ -83,6 +83,24 @@ public class RedissonTopic<M> implements RTopic<M> {
return System.identityHashCode(pubSubListener);
}
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.removeAllListeners(name);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);

@ -71,4 +71,9 @@ public interface RTopic<M> extends RTopicAsync<M> {
*/
void removeListener(int listenerId);
/**
* Removes all listeners from this topic
*/
void removeAllListeners();
}

@ -88,6 +88,14 @@ public class PubSubConnectionEntry {
conn.addListener(listener);
}
public boolean removeAllListeners(String channelName) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {
removeListener(channelName, listener);
}
return !listeners.isEmpty();
}
// TODO optimize
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);

@ -323,7 +323,7 @@ public class RedissonLocalCachedMapTest extends BaseTest {
}
@Test
public void testPutAll() {
public void testPutAll() throws InterruptedException {
Map<Integer, String> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Map<Integer, String> map1 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
@ -344,6 +344,9 @@ public class RedissonLocalCachedMapTest extends BaseTest {
map1.putAll(joinMap);
// waiting for cache cleanup listeners triggering
Thread.sleep(500);
assertThat(cache.size()).isEqualTo(3);
assertThat(cache1.size()).isEqualTo(3);
}

@ -318,6 +318,22 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test
public void testRemoveAllListeners() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<Message> topic1 = redisson.getTopic("topic1");
for (int i = 0; i < 10; i++) {
topic1.addListener((channel, msg) -> {
Assert.fail();
});
}
topic1 = redisson.getTopic("topic1");
topic1.removeAllListeners();
topic1.publish(new Message("123"));
redisson.shutdown();
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {

Loading…
Cancel
Save