draft counter implementation to track if old value is required in pubsub message

Signed-off-by: Tom Erik Støwer <testower@gmail.com>
pull/3531/head
Tom Erik Støwer 4 years ago
parent b25313bcd0
commit d20985b6bd

@ -157,6 +157,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return "jcache_removed_channel:{" + getRawName() + "}";
}
String getUpdatedChannelOldValueRequiredCounter() { return "jcache_updated_old_value_required_counter:{" + getName() + "}"; }
long currentNanoTime() {
if (config.isStatisticsEnabled()) {
return System.nanoTime();
@ -607,16 +609,30 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "end; "
@ -641,7 +657,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getUpdatedChannelOldValueRequiredCounter()),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
RPromise<Boolean> result = waitSync(syncId, res);
@ -2561,7 +2577,17 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return oldValue;
}
private int incrementOldValueRequiredCounter(String counterName) {
return evalWrite(getName(), codec, RedisCommands.EVAL_INTEGER,
"return redis.call('incr', KEYS[1]);",
Arrays.<Object>asList(counterName));
}
private int decrementOldValueRequiredCounter(String counterName) {
return evalWrite(getName(), codec, RedisCommands.EVAL_INTEGER,
"return redis.call('decr', KEYS[1]);",
Arrays.<Object>asList(counterName));
}
@Override
public boolean replace(K key, V value) {
@ -3034,18 +3060,33 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getUpdatedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, true));
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
incrementOldValueRequiredCounter(getUpdatedChannelOldValueRequiredCounter());
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, cacheEntryListenerConfiguration.isOldValueRequired()));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2));
JCacheEntryEvent<K, V> event;
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2));
} else {
event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1));
}
try {
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
((CacheEntryUpdatedListener<K, V>) listener).onUpdated(events);
}
} finally {
sendSync(sync, msg.get(3));
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
sendSync(sync, msg.get(3));
} else {
sendSync(sync, msg.get(2));
}
}
}
});
@ -3088,6 +3129,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
redisson.getTopic(entry.getValue()).removeListener(entry.getKey());
}
}
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
final CacheEntryListener<? super K, ? super V> listener = cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create();
if (CacheEntryUpdatedListener.class.isAssignableFrom(listener.getClass())) {
decrementOldValueRequiredCounter(getUpdatedChannelOldValueRequiredCounter());
}
}
config.removeCacheEntryListenerConfiguration(cacheEntryListenerConfiguration);
}

@ -415,7 +415,7 @@ public class JCacheTest extends BaseTest {
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(2);
String key = "123";
@ -424,6 +424,11 @@ public class JCacheTest extends BaseTest {
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
UpdatedListener secondClientListener = new UpdatedListener(latch, key, null, "90");
MutableCacheEntryListenerConfiguration<String, String> secondListenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(secondClientListener), null, false, true);
cache.registerCacheEntryListener(secondListenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
@ -431,7 +436,7 @@ public class JCacheTest extends BaseTest {
latch.await();
//Assert.assertNotNull(cache.get(key));
Assert.assertNotNull(cache.get(key));
cache.close();
runner.stop();

Loading…
Cancel
Save