diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java b/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java index 787000e82..eccdef969 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java @@ -39,7 +39,7 @@ public class RedissonListMultimapCache extends RedissonListMultimap private final RedissonMultimapCache baseCache; - RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { + public RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); if (evictionScheduler != null) { evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); @@ -47,7 +47,7 @@ public class RedissonListMultimapCache extends RedissonListMultimap baseCache = new RedissonMultimapCache(connectionManager, this, getTimeoutSetName(), prefix); } - RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { + public RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); if (evictionScheduler != null) { evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 85b3d0be5..3e0a05d6f 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -29,7 +29,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.redisson.api.LocalCachedMapOptions; @@ -37,27 +36,18 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.RFuture; -import org.redisson.api.RListMultimapCache; import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RScoredSortedSet; -import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; -import org.redisson.api.listener.BaseStatusListener; -import org.redisson.api.listener.MessageListener; import org.redisson.cache.Cache; import org.redisson.cache.LFUCacheMap; import org.redisson.cache.LRUCacheMap; +import org.redisson.cache.LocalCacheListener; import org.redisson.cache.LocalCachedMapClear; -import org.redisson.cache.LocalCachedMapDisable; -import org.redisson.cache.LocalCachedMapDisableAck; -import org.redisson.cache.LocalCachedMapDisabledKey; -import org.redisson.cache.LocalCachedMapEnable; import org.redisson.cache.LocalCachedMapInvalidate; import org.redisson.cache.LocalCachedMapUpdate; import org.redisson.cache.LocalCachedMessageCodec; import org.redisson.cache.NoneCacheMap; import org.redisson.cache.ReferenceCacheMap; -import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -73,12 +63,8 @@ import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; @@ -95,8 +81,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R public static final String DISABLED_KEYS_SUFFIX = "disabled-keys"; public static final String DISABLED_ACK_SUFFIX = ":topic"; - private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class); - public static class CacheKey implements Serializable { private final byte[] keyHash; @@ -188,16 +172,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10); private byte[] instanceId; - private RTopic invalidationTopic; private Cache cache; - private Map disabledKeys = new ConcurrentHashMap(); private int invalidateEntryOnChange; - private int syncListenerId; - private int reconnectionListenerId; - private volatile long lastInvalidate; private SyncStrategy syncStrategy; - private final Codec topicCodec = new LocalCachedMessageCodec(); + private LocalCacheListener listener; + public RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(commandExecutor, name, redisson, options); init(name, options, redisson, evictionScheduler); @@ -218,172 +198,27 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) { invalidateEntryOnChange = 2; - evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1)); + evictionScheduler.schedule(listener.getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1)); } cache = createCache(options); - addListeners(name, options, redisson); - } - - private void addListeners(final String name, final LocalCachedMapOptions options, final RedissonClient redisson) { - invalidationTopic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, TOPIC_SUFFIX)); - - if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { - reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() { - @Override - public void onSubscribe(String channel) { - if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) { - cache.clear(); - } - if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD - // check if instance has already been used - && lastInvalidate > 0) { - - if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) { - cache.clear(); - return; - } - - isExistsAsync().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't check existance", future.cause()); - return; - } - - if (!future.getNow()) { - cache.clear(); - return; - } - - RScoredSortedSet logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE); - logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true) - .addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't load update log", future.cause()); - return; - } - - for (byte[] entry : future.getNow()) { - byte[] keyHash = Arrays.copyOf(entry, 16); - CacheKey key = new CacheKey(keyHash); - cache.remove(key); - } - } - }); - } - }); - - } - } - }); - } - - if (options.getSyncStrategy() != SyncStrategy.NONE) { - syncListenerId = invalidationTopic.addListener(new MessageListener() { - @Override - public void onMessage(String channel, Object msg) { - if (msg instanceof LocalCachedMapDisable) { - LocalCachedMapDisable m = (LocalCachedMapDisable) msg; - String requestId = m.getRequestId(); - Set keysToDisable = new HashSet(); - for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) { - CacheKey key = new CacheKey(keyHash); - keysToDisable.add(key); - } - - disableKeys(requestId, keysToDisable, m.getTimeout()); - - RedissonTopic topic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, requestId + DISABLED_ACK_SUFFIX)); - topic.publishAsync(new LocalCachedMapDisableAck()); - } - - if (msg instanceof LocalCachedMapEnable) { - LocalCachedMapEnable m = (LocalCachedMapEnable) msg; - for (byte[] keyHash : m.getKeyHashes()) { - CacheKey key = new CacheKey(keyHash); - disabledKeys.remove(key, m.getRequestId()); - } - } - - if (msg instanceof LocalCachedMapClear) { - cache.clear(); - } - - if (msg instanceof LocalCachedMapInvalidate) { - LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; - if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { - for (byte[] keyHash : invalidateMsg.getKeyHashes()) { - CacheKey key = new CacheKey(keyHash); - cache.remove(key); - } - } - } - - if (msg instanceof LocalCachedMapUpdate) { - LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg; - - for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) { - ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey()); - ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue()); - try { - CacheKey cacheKey = toCacheKey(keyBuf); - Object key = codec.getMapKeyDecoder().decode(keyBuf, null); - Object value = codec.getMapValueDecoder().decode(valueBuf, null); - cachePut(cacheKey, key, value); - } catch (IOException e) { - log.error("Can't decode map entry", e); - } finally { - keyBuf.release(); - valueBuf.release(); - } - } - - } - - if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) { - lastInvalidate = System.currentTimeMillis(); - } - } - }); - - String disabledKeysName = RedissonObject.suffixName(getName(), DISABLED_KEYS_SUFFIX); - RListMultimapCache multimap = redisson.getListMultimapCache(disabledKeysName, getCodec()); + listener = new LocalCacheListener(name, commandExecutor, cache, this, instanceId, codec, options, cacheUpdateLogTime) { - for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) { - Set keysToDisable = new HashSet(); - for (String hash : multimap.getAll(key)) { - CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash)); - keysToDisable.add(cacheKey); - } - - disableKeys(key.getRequestId(), keysToDisable, key.getTimeout()); - } - } - } - - private void disableKeys(final String requestId, final Set keys, long timeout) { - for (CacheKey key : keys) { - disabledKeys.put(key, requestId); - cache.remove(key); - } - - commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @Override - public void run() { - for (CacheKey cacheKey : keys) { - disabledKeys.remove(cacheKey, requestId); - } + protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { + CacheKey cacheKey = toCacheKey(keyBuf); + Object key = codec.getMapKeyDecoder().decode(keyBuf, null); + Object value = codec.getMapValueDecoder().decode(valueBuf, null); + cachePut(cacheKey, key, value); } - }, timeout, TimeUnit.MILLISECONDS); + + }; + listener.add(); } - + private void cachePut(CacheKey cacheKey, Object key, Object value) { - if (disabledKeys.containsKey(cacheKey)) { + if (listener.isDisabled(cacheKey)) { return; } @@ -471,10 +306,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } - String getUpdatesLogName() { - return prefixName("redisson__cache_updates_log", getName()); - } - protected static byte[] generateId() { byte[] id = new byte[16]; // TODO JDK UPGRADE replace to native ThreadLocalRandom @@ -515,7 +346,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "redis.call('publish', KEYS[2], ARGV[3]); " + "end;" + "return v; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @@ -547,18 +378,13 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "return 0; " + "end; " + "return 1; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @Override public void destroy() { - if (syncListenerId != 0) { - invalidationTopic.removeListener(syncListenerId); - } - if (reconnectionListenerId != 0) { - invalidationTopic.removeListener(reconnectionListenerId); - } + listener.remove(); } @Override @@ -580,7 +406,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end;" + "end; " + "return v", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @@ -608,7 +434,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "table.insert(result, val);" + "end;" + "return result;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + Arrays.asList(getName(), listener.getInvalidationTopicName()), params.toArray()); } @@ -639,7 +465,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "table.insert(result, val);" + "end;" + "return result;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), params.toArray()); } @@ -688,7 +514,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end;" + "end;" + "return counter;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + Arrays.asList(getName(), listener.getInvalidationTopicName()), params.toArray()); } @@ -718,7 +544,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end;" + "end;" + "return counter;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), params.toArray()); } @@ -746,7 +572,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "return 1;" + "end; " + "return 0;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), msgEncoded, invalidateEntryOnChange); } @@ -856,7 +682,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end; " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), params.toArray()); future.addListener(new FutureListener() { @@ -890,7 +716,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "redis.call('publish', KEYS[2], ARGV[4]); " + "end;" + "return result; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); future.addListener(new FutureListener() { @@ -1027,21 +853,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture clearLocalCacheAsync() { - final RPromise result = new RedissonPromise(); - RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear()); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - result.trySuccess(null); - } - }); - - return result; + return listener.clearLocalCacheAsync(); } @Override @@ -1123,7 +935,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "else " + "return nil; " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); } @@ -1169,7 +981,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "else " + "return 0; " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); } @@ -1215,7 +1027,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "else " + "return 0 " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + Arrays.asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); } @@ -1262,7 +1074,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public ByteBuf encode(Object value) { try { - return topicCodec.getValueEncoder().encode(value); + return LocalCachedMessageCodec.INSTANCE.getValueEncoder().encode(value); } catch (IOException e) { throw new IllegalArgumentException(e); } diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java new file mode 100644 index 000000000..c5592c15d --- /dev/null +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -0,0 +1,294 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.cache; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.redisson.RedissonListMultimapCache; +import org.redisson.RedissonLocalCachedMap.CacheKey; +import org.redisson.RedissonObject; +import org.redisson.RedissonScoredSortedSet; +import org.redisson.RedissonTopic; +import org.redisson.api.LocalCachedMapOptions; +import org.redisson.api.RFuture; +import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; +import org.redisson.api.LocalCachedMapOptions.SyncStrategy; +import org.redisson.api.RListMultimapCache; +import org.redisson.api.RObject; +import org.redisson.api.RScoredSortedSet; +import org.redisson.api.RTopic; +import org.redisson.api.listener.BaseStatusListener; +import org.redisson.api.listener.MessageListener; +import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public abstract class LocalCacheListener { + + public static final String TOPIC_SUFFIX = "topic"; + public static final String DISABLED_KEYS_SUFFIX = "disabled-keys"; + public static final String DISABLED_ACK_SUFFIX = ":topic"; + + private Map disabledKeys = new ConcurrentHashMap(); + + private static final Logger log = LoggerFactory.getLogger(LocalCacheListener.class); + + private String name; + private CommandAsyncExecutor commandExecutor; + private Cache cache; + private RObject object; + private byte[] instanceId; + private Codec codec; + private LocalCachedMapOptions options; + + private long cacheUpdateLogTime; + private volatile long lastInvalidate; + private RTopic invalidationTopic; + private int syncListenerId; + private int reconnectionListenerId; + + public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, Cache cache, + RObject object, byte[] instanceId, Codec codec, LocalCachedMapOptions options, long cacheUpdateLogTime) { + super(); + this.name = name; + this.commandExecutor = commandExecutor; + this.cache = cache; + this.object = object; + this.instanceId = instanceId; + this.codec = codec; + this.options = options; + this.cacheUpdateLogTime = cacheUpdateLogTime; + } + + public boolean isDisabled(Object key) { + return disabledKeys.containsKey(key); + } + + public void add() { + invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); + + if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { + reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() { + @Override + public void onSubscribe(String channel) { + if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) { + cache.clear(); + } + if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD + // check if instance has already been used + && lastInvalidate > 0) { + + if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) { + cache.clear(); + return; + } + + object.isExistsAsync().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't check existance", future.cause()); + return; + } + + if (!future.getNow()) { + cache.clear(); + return; + } + + RScoredSortedSet logs = new RedissonScoredSortedSet(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null); + logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true) + .addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't load update log", future.cause()); + return; + } + + for (byte[] entry : future.getNow()) { + byte[] keyHash = Arrays.copyOf(entry, 16); + CacheKey key = new CacheKey(keyHash); + cache.remove(key); + } + } + }); + } + }); + + } + } + }); + } + + if (options.getSyncStrategy() != SyncStrategy.NONE) { + syncListenerId = invalidationTopic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, Object msg) { + if (msg instanceof LocalCachedMapDisable) { + LocalCachedMapDisable m = (LocalCachedMapDisable) msg; + String requestId = m.getRequestId(); + Set keysToDisable = new HashSet(); + for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) { + CacheKey key = new CacheKey(keyHash); + keysToDisable.add(key); + } + + disableKeys(requestId, keysToDisable, m.getTimeout()); + + RedissonTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, + commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX)); + topic.publishAsync(new LocalCachedMapDisableAck()); + } + + if (msg instanceof LocalCachedMapEnable) { + LocalCachedMapEnable m = (LocalCachedMapEnable) msg; + for (byte[] keyHash : m.getKeyHashes()) { + CacheKey key = new CacheKey(keyHash); + disabledKeys.remove(key, m.getRequestId()); + } + } + + if (msg instanceof LocalCachedMapClear) { + cache.clear(); + } + + if (msg instanceof LocalCachedMapInvalidate) { + LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; + if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { + for (byte[] keyHash : invalidateMsg.getKeyHashes()) { + CacheKey key = new CacheKey(keyHash); + cache.remove(key); + } + } + } + + if (msg instanceof LocalCachedMapUpdate) { + LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg; + + for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) { + ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey()); + ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue()); + try { + updateCache(keyBuf, valueBuf); + } catch (IOException e) { + log.error("Can't decode map entry", e); + } finally { + keyBuf.release(); + valueBuf.release(); + } + } + + } + + if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) { + lastInvalidate = System.currentTimeMillis(); + } + } + + }); + + String disabledKeysName = RedissonObject.suffixName(name, DISABLED_KEYS_SUFFIX); + RListMultimapCache multimap = new RedissonListMultimapCache(null, codec, commandExecutor, disabledKeysName); + + for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) { + Set keysToDisable = new HashSet(); + for (String hash : multimap.getAll(key)) { + CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash)); + keysToDisable.add(cacheKey); + } + + disableKeys(key.getRequestId(), keysToDisable, key.getTimeout()); + } + } + } + + public RFuture clearLocalCacheAsync() { + final RPromise result = new RedissonPromise(); + RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear()); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(null); + } + }); + + return result; + } + + public String getInvalidationTopicName() { + return RedissonObject.suffixName(name, TOPIC_SUFFIX); + } + + protected abstract void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException; + + private void disableKeys(final String requestId, final Set keys, long timeout) { + for (CacheKey key : keys) { + disabledKeys.put(key, requestId); + cache.remove(key); + } + + commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + @Override + public void run() { + for (CacheKey cacheKey : keys) { + disabledKeys.remove(cacheKey, requestId); + } + } + }, timeout, TimeUnit.MILLISECONDS); + } + + public void remove() { + if (syncListenerId != 0) { + invalidationTopic.removeListener(syncListenerId); + } + if (reconnectionListenerId != 0) { + invalidationTopic.removeListener(reconnectionListenerId); + } + } + + public String getUpdatesLogName() { + return RedissonObject.prefixName("redisson__cache_updates_log", name); + } + +}