RMap.fastReplace method added. #1379

pull/1423/head
Nikita 7 years ago
parent 2cea6711ff
commit dc6e80ebb9

@ -870,11 +870,58 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
mapKeys.toArray());
}
@Override
public RFuture<Boolean> fastReplaceAsync(final K key, final V value) {
RFuture<Boolean> future = super.fastReplaceAsync(key, value);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
CacheKey cacheKey = toCacheKey(key);
cachePut(cacheKey, key, value);
}
}
});
return future;
}
@Override
protected RFuture<Boolean> fastReplaceOperationAsync(K key, V value) {
ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
protected RFuture<V> replaceOperationAsync(K key, V value) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
@ -920,10 +967,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf keyState = encodeMapKey(key);
ByteBuf oldValueState = encodeMapValue(oldValue);
ByteBuf newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -967,9 +1014,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));

@ -623,6 +623,47 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
+ "end",
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
}
@Override
public boolean fastReplace(K key, V value) {
return get(fastReplaceAsync(key, value));
}
@Override
public RFuture<Boolean> fastReplaceAsync(final K key, final V value) {
checkKey(key);
checkValue(value);
RFuture<Boolean> future = fastReplaceOperationAsync(key, value);
if (hasNoWriter()) {
return future;
}
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
public void execute() {
options.getWriter().write(key, value);
}
@Override
protected boolean condition(Future<Boolean> future) {
return future.getNow();
}
};
return mapWriterFuture(future, listener);
}
protected RFuture<Boolean> fastReplaceOperationAsync(final K key, final V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
}
public RFuture<V> getOperationAsync(K key) {
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), encodeMapKey(key));

@ -1602,6 +1602,37 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
}
@Override
protected RFuture<Boolean> fastReplaceOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
"if value == false then " +
" return 0; " +
"end; " +
"local t, val = struct.unpack('dLc0', value); " +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " +
"if expireDateScore ~= false then " +
" expireDate = tonumber(expireDateScore) " +
"end; " +
"if t ~= 0 then " +
" local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); " +
" if expireIdle ~= false then " +
" expireDate = math.min(expireDate, tonumber(expireIdle)) " +
" end; " +
"end; " +
"if expireDate <= tonumber(ARGV[1]) then " +
" return 0; " +
"end; " +
"local value = struct.pack('dLc0', t, string.len(ARGV[3]), ARGV[3]); " +
"redis.call('hset', KEYS[1], ARGV[2], value); " +
"local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); " +
"redis.call('publish', KEYS[4], msg); " +
"return 1; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
protected RFuture<V> replaceOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
@ -1631,7 +1662,6 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"return val; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override

@ -219,7 +219,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
/**
* Removes <code>keys</code> from map by one operation
* <p>
* Works faster than <code>{@link RMap#remove(Object)}</code> but not returning
* Works faster than <code>{@link #remove(Object)}</code> but not returning
* the value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then <code>keys</code>are deleted in write-through mode.
@ -232,7 +232,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
* <p>
* Works faster than <code>{@link RMap#put(Object, Object)}</code> but not returning
* Works faster than <code>{@link #put(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
@ -244,11 +244,26 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
*/
boolean fastPut(K key, V value);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* <p>
* Works faster than <code>{@link #replace(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key exists and value was updated.
* <code>false</code> if key doesn't exists and value wasn't updated.
*/
boolean fastReplace(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* Works faster than <code>{@link RMap#putIfAbsent(Object, Object)}</code> but not returning
* Works faster than <code>{@link #putIfAbsent(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.

@ -123,6 +123,21 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<Boolean> fastPutAsync(K key, V value);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* <p>
* Works faster than <code>{@link RMap#replaceAsync(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key exists and value was updated.
* <code>false</code> if key doesn't exists and value wasn't updated.
*/
RFuture<Boolean> fastReplaceAsync(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.

@ -587,6 +587,18 @@ public abstract class BaseMapTest extends BaseTest {
assertThat(map.get(1)).isEqualTo(3);
Assert.assertEquals(1, map.size());
}
@Test
public void testFastReplace() throws Exception {
RMap<Integer, Integer> map = getMap("simple");
map.put(1, 2);
assertThat(map.fastReplace(1, 3)).isTrue();
assertThat(map.fastReplace(2, 0)).isFalse();
Assert.assertEquals(1, map.size());
assertThat(map.get(1)).isEqualTo(3);
}
@Test
public void testEquals() {

@ -12,7 +12,6 @@ import org.junit.Test;
import org.redisson.api.MapOptions;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
public class RedissonMapTest extends BaseMapTest {

Loading…
Cancel
Save