Fixed - RMapCacheAsync interface misses addListenerAsync() method. #4753

pull/3899/head^2
Nikita Koksharov 2 years ago
parent 9cbce86ecc
commit c1c136d0d6

@ -2265,85 +2265,99 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
params.toArray());
}
private MapCacheEventCodec.OSType osType;
private volatile MapCacheEventCodec.OSType osType;
private volatile Codec topicCodec;
@Override
public int addListener(MapEntryListener listener) {
if (listener == null) {
throw new NullPointerException();
}
return get(addListenerAsync(listener));
}
@Override
public RFuture<Integer> addListenerAsync(MapEntryListener listener) {
Objects.requireNonNull(listener);
CompletionStage<MapCacheEventCodec.OSType> osTypeFuture = CompletableFuture.completedFuture(osType);
if (osType == null) {
RFuture<Map<String, String>> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
String os = serverFuture.toCompletableFuture().join().get("os");
if (os == null || os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {
osType = BaseEventCodec.OSType.HPNONSTOP;
}
}
if (listener instanceof EntryRemovedListener) {
RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getRemovedChannelName());
return topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K) msg.get(0), (V) msg.get(1), null);
((EntryRemovedListener<K, V>) listener).onRemoved(event);
osTypeFuture = serverFuture.thenApply(res -> {
String os = res.get("os");
if (os == null || os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {
osType = BaseEventCodec.OSType.HPNONSTOP;
}
topicCodec = new MapCacheEventCodec(codec, osType);
return osType;
});
}
if (listener instanceof EntryCreatedListener) {
RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getCreatedChannelName());
return topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.CREATED, (K) msg.get(0), (V) msg.get(1), null);
((EntryCreatedListener<K, V>) listener).onCreated(event);
}
});
}
CompletionStage<Integer> f = osTypeFuture.thenCompose(osType -> {
if (listener instanceof EntryRemovedListener) {
RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getRemovedChannelName());
return topic.addListenerAsync(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K) msg.get(0), (V) msg.get(1), null);
((EntryRemovedListener<K, V>) listener).onRemoved(event);
}
});
}
if (listener instanceof EntryUpdatedListener) {
RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getUpdatedChannelName());
return topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K) msg.get(0), (V) msg.get(1), (V) msg.get(2));
((EntryUpdatedListener<K, V>) listener).onUpdated(event);
}
});
}
if (listener instanceof EntryCreatedListener) {
RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getCreatedChannelName());
return topic.addListenerAsync(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.CREATED, (K) msg.get(0), (V) msg.get(1), null);
((EntryCreatedListener<K, V>) listener).onCreated(event);
}
});
}
if (listener instanceof EntryExpiredListener) {
RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getExpiredChannelName());
return topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K) msg.get(0), (V) msg.get(1), null);
((EntryExpiredListener<K, V>) listener).onExpired(event);
}
});
}
if (listener instanceof EntryUpdatedListener) {
RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getUpdatedChannelName());
return topic.addListenerAsync(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K) msg.get(0), (V) msg.get(1), (V) msg.get(2));
((EntryUpdatedListener<K, V>) listener).onUpdated(event);
}
});
}
throw new IllegalArgumentException("Wrong listener type " + listener.getClass());
if (listener instanceof EntryExpiredListener) {
RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getExpiredChannelName());
return topic.addListenerAsync(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K) msg.get(0), (V) msg.get(1), null);
((EntryExpiredListener<K, V>) listener).onExpired(event);
}
});
}
CompletableFuture<Integer> res = new CompletableFuture<>();
res.completeExceptionally(new IllegalArgumentException("Wrong listener type " + listener.getClass()));
return res;
});
return new CompletableFutureWrapper<>(f);
}
@Override
public void removeListener(int listenerId) {
super.removeListener(listenerId);
RTopic removedTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getRemovedChannelName());
RTopic removedTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getRemovedChannelName());
removedTopic.removeListener(listenerId);
RTopic createdTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getCreatedChannelName());
RTopic createdTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getCreatedChannelName());
createdTopic.removeListener(listenerId);
RTopic updatedTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getUpdatedChannelName());
RTopic updatedTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getUpdatedChannelName());
updatedTopic.removeListener(listenerId);
RTopic expiredTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getExpiredChannelName());
RTopic expiredTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getExpiredChannelName());
expiredTopic.removeListener(listenerId);
}

@ -17,6 +17,7 @@ package org.redisson.api;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.event.MapEntryListener;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -305,5 +306,18 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
* -1 if the key exists but has no associated expire.
*/
RFuture<Long> remainTimeToLiveAsync(K key);
/**
* Adds map entry listener
*
* @see org.redisson.api.map.event.EntryCreatedListener
* @see org.redisson.api.map.event.EntryUpdatedListener
* @see org.redisson.api.map.event.EntryRemovedListener
* @see org.redisson.api.map.event.EntryExpiredListener
*
* @param listener - entry listener
* @return listener id
*/
RFuture<Integer> addListenerAsync(MapEntryListener listener);
}

Loading…
Cancel
Save