From dc6e80ebb980ffa8fa0310e8afa890c963b10ef4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 16 Apr 2018 16:33:34 +0300 Subject: [PATCH] RMap.fastReplace method added. #1379 --- .../org/redisson/RedissonLocalCachedMap.java | 59 +++++++++++++++++-- .../main/java/org/redisson/RedissonMap.java | 41 +++++++++++++ .../java/org/redisson/RedissonMapCache.java | 32 +++++++++- .../src/main/java/org/redisson/api/RMap.java | 21 ++++++- .../main/java/org/redisson/api/RMapAsync.java | 15 +++++ .../test/java/org/redisson/BaseMapTest.java | 12 ++++ .../java/org/redisson/RedissonMapTest.java | 1 - 7 files changed, 170 insertions(+), 11 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 47e2b3fb1..50ceea08b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -870,11 +870,58 @@ public class RedissonLocalCachedMap extends RedissonMap implements R mapKeys.toArray()); } + @Override + public RFuture fastReplaceAsync(final K key, final V value) { + RFuture future = super.fastReplaceAsync(key, value); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow()) { + CacheKey cacheKey = toCacheKey(key); + cachePut(cacheKey, key, value); + } + } + }); + + return future; + } + + @Override + protected RFuture fastReplaceOperationAsync(K key, V value) { + ByteBuf keyState = encodeMapKey(key); + ByteBuf valueState = encodeMapValue(value); + CacheKey cacheKey = toCacheKey(keyState); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey); + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + + "if ARGV[3] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + + "if ARGV[3] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + + + "return 1; " + + "else " + + "return 0; " + + "end", + Arrays.asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); + } + @Override protected RFuture replaceOperationAsync(K key, V value) { - final ByteBuf keyState = encodeMapKey(key); + ByteBuf keyState = encodeMapKey(key); ByteBuf valueState = encodeMapValue(value); - final CacheKey cacheKey = toCacheKey(keyState); + CacheKey cacheKey = toCacheKey(keyState); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey); return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, @@ -920,10 +967,10 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override protected RFuture replaceOperationAsync(K key, V oldValue, V newValue) { - final ByteBuf keyState = encodeMapKey(key); + ByteBuf keyState = encodeMapKey(key); ByteBuf oldValueState = encodeMapValue(oldValue); ByteBuf newValueState = encodeMapValue(newValue); - final CacheKey cacheKey = toCacheKey(keyState); + CacheKey cacheKey = toCacheKey(keyState); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey); return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -967,9 +1014,9 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override protected RFuture removeOperationAsync(Object key, Object value) { - final ByteBuf keyState = encodeMapKey(key); + ByteBuf keyState = encodeMapKey(key); ByteBuf valueState = encodeMapValue(value); - final CacheKey cacheKey = toCacheKey(keyState); + CacheKey cacheKey = toCacheKey(keyState); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 26267b7bd..c12a2434f 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -623,6 +623,47 @@ public class RedissonMap extends RedissonExpirable implements RMap { + "end", Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); } + + @Override + public boolean fastReplace(K key, V value) { + return get(fastReplaceAsync(key, value)); + } + + @Override + public RFuture fastReplaceAsync(final K key, final V value) { + checkKey(key); + checkValue(value); + + RFuture future = fastReplaceOperationAsync(key, value); + if (hasNoWriter()) { + return future; + } + + MapWriterTask listener = new MapWriterTask() { + @Override + public void execute() { + options.getWriter().write(key, value); + } + + @Override + protected boolean condition(Future future) { + return future.getNow(); + } + }; + return mapWriterFuture(future, listener); + } + + protected RFuture fastReplaceOperationAsync(final K key, final V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "return 1; " + + "else " + + "return 0; " + + "end", + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); + } + public RFuture getOperationAsync(K key) { return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), encodeMapKey(key)); diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index d5816630e..dca0b53ea 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -1602,6 +1602,37 @@ public class RedissonMapCache extends RedissonMap implements RMapCac System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); } + @Override + protected RFuture fastReplaceOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[2]); " + + "if value == false then " + + " return 0; " + + "end; " + + "local t, val = struct.unpack('dLc0', value); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + " expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + " local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); " + + " if expireIdle ~= false then " + + " expireDate = math.min(expireDate, tonumber(expireIdle)) " + + " end; " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + " return 0; " + + "end; " + + "local value = struct.pack('dLc0', t, string.len(ARGV[3]), ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[2], value); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "return 1; ", + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelNameByKey(key)), + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); + } + @Override protected RFuture replaceOperationAsync(K key, V value) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, @@ -1631,7 +1662,6 @@ public class RedissonMapCache extends RedissonMap implements RMapCac "return val; ", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelNameByKey(key)), System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); - } @Override diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 5a7b9f93e..3ee0369ba 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -219,7 +219,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsynckeys from map by one operation *

- * Works faster than {@link RMap#remove(Object)} but not returning + * Works faster than {@link #remove(Object)} but not returning * the value associated with key *

* If {@link MapWriter} is defined then keysare deleted in write-through mode. @@ -232,7 +232,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue with the specified key. *

- * Works faster than {@link RMap#put(Object, Object)} but not returning + * Works faster than {@link #put(Object, Object)} but not returning * the previous value associated with key *

* If {@link MapWriter} is defined then new map entry is stored in write-through mode. @@ -244,11 +244,26 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue associated with the key. + *

+ * Works faster than {@link #replace(Object, Object)} but not returning + * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if key exists and value was updated. + * false if key doesn't exists and value wasn't updated. + */ + boolean fastReplace(K key, V value); + /** * Associates the specified value with the specified key * only if there is no any association with specifiedkey. *

- * Works faster than {@link RMap#putIfAbsent(Object, Object)} but not returning + * Works faster than {@link #putIfAbsent(Object, Object)} but not returning * the previous value associated with key *

* If {@link MapWriter} is defined then new map entry is stored in write-through mode. diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index 6775fe60b..72b35c4a2 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -123,6 +123,21 @@ public interface RMapAsync extends RExpirableAsync { */ RFuture fastPutAsync(K key, V value); + /** + * Replaces previous value with a new value associated with the key. + *

+ * Works faster than {@link RMap#replaceAsync(Object, Object)} but not returning + * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if key exists and value was updated. + * false if key doesn't exists and value wasn't updated. + */ + RFuture fastReplaceAsync(K key, V value); + /** * Associates the specified value with the specified key * only if there is no any association with specifiedkey. diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index bb6cdd708..a8547cf3a 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -587,6 +587,18 @@ public abstract class BaseMapTest extends BaseTest { assertThat(map.get(1)).isEqualTo(3); Assert.assertEquals(1, map.size()); } + + @Test + public void testFastReplace() throws Exception { + RMap map = getMap("simple"); + map.put(1, 2); + + assertThat(map.fastReplace(1, 3)).isTrue(); + assertThat(map.fastReplace(2, 0)).isFalse(); + + Assert.assertEquals(1, map.size()); + assertThat(map.get(1)).isEqualTo(3); + } @Test public void testEquals() { diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index cfd8b02dc..daca3d6ad 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -12,7 +12,6 @@ import org.junit.Test; import org.redisson.api.MapOptions; import org.redisson.api.RMap; import org.redisson.client.codec.Codec; -import org.redisson.codec.JsonJacksonCodec; public class RedissonMapTest extends BaseMapTest {