refactoring

pull/970/merge
Nikita Koksharov 5 years ago
parent 4e1e97fc73
commit d58f303189

@ -86,7 +86,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
CacheKey cacheKey = toCacheKey(keyBuf); CacheKey cacheKey = localCacheView.toCacheKey(keyBuf);
Object key = codec.getMapKeyDecoder().decode(keyBuf, null); Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
Object value = codec.getMapValueDecoder().decode(valueBuf, null); Object value = codec.getMapValueDecoder().decode(valueBuf, null);
cachePut(cacheKey, key, value); cachePut(cacheKey, key, value);
@ -107,6 +107,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
} }
public LocalCacheView<K, V> getLocalCacheView() {
return localCacheView;
}
private void broadcastLocalCacheStore(V value, ByteBuf mapKey, CacheKey cacheKey) { private void broadcastLocalCacheStore(V value, ByteBuf mapKey, CacheKey cacheKey) {
if (storeMode != LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode != LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return; return;
@ -165,19 +169,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return cache.remove(cacheKey, new CacheValue(key, value)); return cache.remove(cacheKey, new CacheValue(key, value));
} }
public CacheKey toCacheKey(Object key) {
ByteBuf encoded = encodeMapKey(key);
try {
return toCacheKey(encoded);
} finally {
encoded.release();
}
}
private CacheKey toCacheKey(ByteBuf encodedKey) {
return new CacheKey(Hash.hash128toArray(encodedKey));
}
@Override @Override
public RFuture<Integer> sizeAsync() { public RFuture<Integer> sizeAsync() {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
@ -190,7 +181,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<Boolean> containsKeyAsync(Object key) { public RFuture<Boolean> containsKeyAsync(Object key) {
checkKey(key); checkKey(key);
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
if (!cache.containsKey(cacheKey)) { if (!cache.containsKey(cacheKey)) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
@ -220,7 +211,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<V> getAsync(Object key) { public RFuture<V> getAsync(Object key) {
checkKey(key); checkKey(key);
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey); CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && cacheValue.getValue() != null) { if (cacheValue != null && cacheValue.getValue() != null) {
return RedissonPromise.newSucceededFuture((V) cacheValue.getValue()); return RedissonPromise.newSucceededFuture((V) cacheValue.getValue());
@ -258,7 +249,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected RFuture<V> putOperationAsync(K key, V value) { protected RFuture<V> putOperationAsync(K key, V value) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePut(cacheKey, key, value); CacheValue prevValue = cachePut(cacheKey, key, value);
broadcastLocalCacheStore(value, mapKey, cacheKey); broadcastLocalCacheStore(value, mapKey, cacheKey);
@ -298,7 +289,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) { protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
ByteBuf encodedKey = encodeMapKey(key); ByteBuf encodedKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(encodedKey); CacheKey cacheKey = localCacheView.toCacheKey(encodedKey);
CacheValue prevValue = cachePut(cacheKey, key, value); CacheValue prevValue = cachePut(cacheKey, key, value);
broadcastLocalCacheStore(value, encodedKey, cacheKey); broadcastLocalCacheStore(value, encodedKey, cacheKey);
@ -335,7 +326,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected RFuture<V> removeOperationAsync(K key) { protected RFuture<V> removeOperationAsync(K key) {
ByteBuf keyEncoded = encodeMapKey(key); ByteBuf keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
CacheValue value = cache.remove(cacheKey); CacheValue value = cache.remove(cacheKey);
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
@ -381,7 +372,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded); params.add(msgEncoded);
@ -408,7 +399,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded); params.add(msgEncoded);
@ -437,7 +428,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
} }
@ -458,7 +449,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
long count = 0; long count = 0;
for (K k : keys) { for (K k : keys) {
CacheKey cacheKey = toCacheKey(k); CacheKey cacheKey = localCacheView.toCacheKey(k);
CacheValue val = cache.remove(cacheKey); CacheValue val = cache.remove(cacheKey);
if (val != null) { if (val != null) {
count++; count++;
@ -475,7 +466,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded); params.add(msgEncoded);
@ -501,7 +492,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded); params.add(msgEncoded);
@ -530,7 +521,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyEncoded = encodeMapKey(k); ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded); params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey); cache.remove(cacheKey);
} }
@ -567,7 +558,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
Set<K> mapKeys = new HashSet<K>(keys); Set<K> mapKeys = new HashSet<K>(keys);
for (Iterator<K> iterator = mapKeys.iterator(); iterator.hasNext();) { for (Iterator<K> iterator = mapKeys.iterator(); iterator.hasNext();) {
K key = iterator.next(); K key = iterator.next();
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
CacheValue value = cache.get(cacheKey); CacheValue value = cache.get(cacheKey);
if (value != null) { if (value != null) {
result.put(key, (V) value.getValue()); result.put(key, (V) value.getValue());
@ -598,7 +589,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private void cacheMap(Map<?, ?> map) { private void cacheMap(Map<?, ?> map) {
for (java.util.Map.Entry<?, ?> entry : map.entrySet()) { for (java.util.Map.Entry<?, ?> entry : map.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey()); CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
} }
} }
@ -608,7 +599,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
for (Entry<? extends K, ? extends V> entry : map.entrySet()) { for (Entry<? extends K, ? extends V> entry : map.entrySet()) {
ByteBuf keyEncoded = encodeMapKey(entry.getKey()); ByteBuf keyEncoded = encodeMapKey(entry.getKey());
CacheKey cacheKey = toCacheKey(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
broadcastLocalCacheStore(entry.getValue(), keyEncoded, cacheKey); broadcastLocalCacheStore(entry.getValue(), keyEncoded, cacheKey);
@ -627,7 +618,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf mapValue = encodeMapValue(t.getValue()); ByteBuf mapValue = encodeMapValue(t.getValue());
params.add(mapKey); params.add(mapKey);
params.add(mapValue); params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
hashes[i] = cacheKey.getKeyHash(); hashes[i] = cacheKey.getKeyHash();
i++; i++;
} }
@ -691,7 +682,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
protected RFuture<V> addAndGetOperationAsync(K key, Number value) { protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
ByteBuf keyState = encodeMapKey(key); ByteBuf keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState); CacheKey cacheKey = localCacheView.toCacheKey(keyState);
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())), RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
@ -709,7 +700,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
if (res != null) { if (res != null) {
CacheKey cKey = toCacheKey(key); CacheKey cKey = localCacheView.toCacheKey(key);
cachePut(cKey, key, res); cachePut(cKey, key, res);
} }
}); });
@ -720,7 +711,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) { public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) { if (prevValue == null) {
broadcastLocalCacheStore(value, mapKey, cacheKey); broadcastLocalCacheStore(value, mapKey, cacheKey);
@ -738,7 +729,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res) { if (res) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value); cachePut(cacheKey, key, value);
} }
}); });
@ -822,7 +813,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
for (java.util.Map.Entry<K, V> entry : res.entrySet()) { for (java.util.Map.Entry<K, V> entry : res.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey()); CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
} }
result.putAll(res); result.putAll(res);
@ -835,7 +826,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public void preloadCache() { public void preloadCache() {
for (Entry<K, V> entry : super.entrySet()) { for (Entry<K, V> entry : super.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey()); CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
} }
} }
@ -843,7 +834,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public void preloadCache(int count) { public void preloadCache(int count) {
for (Entry<K, V> entry : super.entrySet(count)) { for (Entry<K, V> entry : super.entrySet(count)) {
CacheKey cacheKey = toCacheKey(entry.getKey()); CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
} }
} }
@ -885,7 +876,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
for (java.util.Map.Entry<K, V> entry : res) { for (java.util.Map.Entry<K, V> entry : res) {
CacheKey cacheKey = toCacheKey(entry.getKey()); CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue()); cachePut(cacheKey, entry.getKey(), entry.getValue());
} }
result.addAll(res); result.addAll(res);
@ -922,7 +913,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<Boolean> fastReplaceAsync(K key, V value) { public RFuture<Boolean> fastReplaceAsync(K key, V value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cacheReplace(cacheKey, key, value); CacheValue prevValue = cacheReplace(cacheKey, key, value);
if (prevValue != null) { if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey); broadcastLocalCacheStore(value, mapKey, cacheKey);
@ -940,7 +931,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res) { if (res) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value); cachePut(cacheKey, key, value);
} }
}); });
@ -952,7 +943,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
protected RFuture<Boolean> fastReplaceOperationAsync(K key, V value) { protected RFuture<Boolean> fastReplaceOperationAsync(K key, V value) {
ByteBuf keyState = encodeMapKey(key); ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value); ByteBuf valueState = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(keyState); CacheKey cacheKey = localCacheView.toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey); ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
@ -979,7 +970,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
protected RFuture<V> replaceOperationAsync(K key, V value) { protected RFuture<V> replaceOperationAsync(K key, V value) {
ByteBuf keyState = encodeMapKey(key); ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value); ByteBuf valueState = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(keyState); CacheKey cacheKey = localCacheView.toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey); ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
@ -1007,7 +998,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<V> replaceAsync(K key, V value) { public RFuture<V> replaceAsync(K key, V value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cacheReplace(cacheKey, key, value); CacheValue prevValue = cacheReplace(cacheKey, key, value);
if (prevValue != null) { if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey); broadcastLocalCacheStore(value, mapKey, cacheKey);
@ -1025,7 +1016,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res != null) { if (res != null) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value); cachePut(cacheKey, key, value);
} }
}); });
@ -1038,7 +1029,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf keyState = encodeMapKey(key); ByteBuf keyState = encodeMapKey(key);
ByteBuf oldValueState = encodeMapValue(oldValue); ByteBuf oldValueState = encodeMapValue(oldValue);
ByteBuf newValueState = encodeMapValue(newValue); ByteBuf newValueState = encodeMapValue(newValue);
CacheKey cacheKey = toCacheKey(keyState); CacheKey cacheKey = localCacheView.toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey); ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -1063,7 +1054,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) { public RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
if (cacheReplace(cacheKey, key, oldValue, newValue)) { if (cacheReplace(cacheKey, key, oldValue, newValue)) {
broadcastLocalCacheStore(newValue, mapKey, cacheKey); broadcastLocalCacheStore(newValue, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
@ -1080,7 +1071,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res) { if (res) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, newValue); cachePut(cacheKey, key, newValue);
} }
}); });
@ -1092,7 +1083,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) { protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
ByteBuf keyState = encodeMapKey(key); ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value); ByteBuf valueState = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(keyState); CacheKey cacheKey = localCacheView.toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
@ -1117,7 +1108,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<Boolean> removeAsync(Object key, Object value) { public RFuture<Boolean> removeAsync(Object key, Object value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
if (cacheRemove(cacheKey, key, value)) { if (cacheRemove(cacheKey, key, value)) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey); broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
@ -1135,7 +1126,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res) { if (res) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cache.remove(cacheKey); cache.remove(cacheKey);
} }
}); });
@ -1147,7 +1138,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public RFuture<V> putIfAbsentAsync(K key, V value) { public RFuture<V> putIfAbsentAsync(K key, V value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key); ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) { if (prevValue == null) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey); broadcastLocalCacheStore((V) value, mapKey, cacheKey);
@ -1165,7 +1156,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
if (res == null) { if (res == null) {
CacheKey cacheKey = toCacheKey(key); CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value); cachePut(cacheKey, key, value);
} }
}); });

@ -252,7 +252,7 @@ public class LocalCacheView<K, V> {
} }
} }
private CacheKey toCacheKey(ByteBuf encodedKey) { public CacheKey toCacheKey(ByteBuf encodedKey) {
return new CacheKey(Hash.hash128toArray(encodedKey)); return new CacheKey(Hash.hash128toArray(encodedKey));
} }

@ -348,7 +348,7 @@ public class RedissonTransaction implements RTransaction {
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>) mapOperation.getMap(); RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>) mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec()); HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash(); byte[] key = map.getLocalCacheView().toCacheKey(mapOperation.getKey()).getKeyHash();
HashValue value = hashes.get(hashKey); HashValue value = hashes.get(hashKey);
if (value == null) { if (value == null) {
value = new HashValue(); value = new HashValue();
@ -442,7 +442,7 @@ public class RedissonTransaction implements RTransaction {
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>) mapOperation.getMap(); RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>) mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec()); HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash(); byte[] key = map.getLocalCacheView().toCacheKey(mapOperation.getKey()).getKeyHash();
HashValue value = hashes.get(hashKey); HashValue value = hashes.get(hashKey);
if (value == null) { if (value == null) {
value = new HashValue(); value = new HashValue();

@ -582,6 +582,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
SimpleKey key1 = new SimpleKey("2"); SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4"); SimpleValue value1 = new SimpleValue("4");
assertThat(map.fastPutIfAbsent(key1, value1)).isTrue(); assertThat(map.fastPutIfAbsent(key1, value1)).isTrue();
Thread.sleep(50);
assertThat(cache.size()).isEqualTo(2); assertThat(cache.size()).isEqualTo(2);
assertThat(map.get(key1)).isEqualTo(value1); assertThat(map.get(key1)).isEqualTo(value1);
} }

Loading…
Cancel
Save