From c1c136d0d661f1a699a1d1543131ae29141870ff Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 19 Dec 2022 09:57:28 +0300 Subject: [PATCH] Fixed - RMapCacheAsync interface misses addListenerAsync() method. #4753 --- .../java/org/redisson/RedissonMapCache.java | 126 ++++++++++-------- .../java/org/redisson/api/RMapCacheAsync.java | 16 ++- 2 files changed, 85 insertions(+), 57 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index a0c96fe46..624ac958b 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -2265,85 +2265,99 @@ public class RedissonMapCache extends RedissonMap implements RMapCac params.toArray()); } - private MapCacheEventCodec.OSType osType; + private volatile MapCacheEventCodec.OSType osType; + private volatile Codec topicCodec; @Override public int addListener(MapEntryListener listener) { - if (listener == null) { - throw new NullPointerException(); - } - + return get(addListenerAsync(listener)); + } + + @Override + public RFuture addListenerAsync(MapEntryListener listener) { + Objects.requireNonNull(listener); + + CompletionStage osTypeFuture = CompletableFuture.completedFuture(osType); if (osType == null) { RFuture> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); - String os = serverFuture.toCompletableFuture().join().get("os"); - if (os == null || os.contains("Windows")) { - osType = BaseEventCodec.OSType.WINDOWS; - } else if (os.contains("NONSTOP")) { - osType = BaseEventCodec.OSType.HPNONSTOP; - } - } - - if (listener instanceof EntryRemovedListener) { - RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getRemovedChannelName()); - return topic.addListener(List.class, new MessageListener>() { - @Override - public void onMessage(CharSequence channel, List msg) { - EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K) msg.get(0), (V) msg.get(1), null); - ((EntryRemovedListener) listener).onRemoved(event); + osTypeFuture = serverFuture.thenApply(res -> { + String os = res.get("os"); + if (os == null || os.contains("Windows")) { + osType = BaseEventCodec.OSType.WINDOWS; + } else if (os.contains("NONSTOP")) { + osType = BaseEventCodec.OSType.HPNONSTOP; } + topicCodec = new MapCacheEventCodec(codec, osType); + return osType; }); } - if (listener instanceof EntryCreatedListener) { - RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getCreatedChannelName()); - return topic.addListener(List.class, new MessageListener>() { - @Override - public void onMessage(CharSequence channel, List msg) { - EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.CREATED, (K) msg.get(0), (V) msg.get(1), null); - ((EntryCreatedListener) listener).onCreated(event); - } - }); - } + CompletionStage f = osTypeFuture.thenCompose(osType -> { + if (listener instanceof EntryRemovedListener) { + RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getRemovedChannelName()); + return topic.addListenerAsync(List.class, new MessageListener>() { + @Override + public void onMessage(CharSequence channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K) msg.get(0), (V) msg.get(1), null); + ((EntryRemovedListener) listener).onRemoved(event); + } + }); + } - if (listener instanceof EntryUpdatedListener) { - RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getUpdatedChannelName()); - return topic.addListener(List.class, new MessageListener>() { - @Override - public void onMessage(CharSequence channel, List msg) { - EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K) msg.get(0), (V) msg.get(1), (V) msg.get(2)); - ((EntryUpdatedListener) listener).onUpdated(event); - } - }); - } + if (listener instanceof EntryCreatedListener) { + RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getCreatedChannelName()); + return topic.addListenerAsync(List.class, new MessageListener>() { + @Override + public void onMessage(CharSequence channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.CREATED, (K) msg.get(0), (V) msg.get(1), null); + ((EntryCreatedListener) listener).onCreated(event); + } + }); + } - if (listener instanceof EntryExpiredListener) { - RTopic topic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getExpiredChannelName()); - return topic.addListener(List.class, new MessageListener>() { - @Override - public void onMessage(CharSequence channel, List msg) { - EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K) msg.get(0), (V) msg.get(1), null); - ((EntryExpiredListener) listener).onExpired(event); - } - }); - } + if (listener instanceof EntryUpdatedListener) { + RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getUpdatedChannelName()); + return topic.addListenerAsync(List.class, new MessageListener>() { + @Override + public void onMessage(CharSequence channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K) msg.get(0), (V) msg.get(1), (V) msg.get(2)); + ((EntryUpdatedListener) listener).onUpdated(event); + } + }); + } - throw new IllegalArgumentException("Wrong listener type " + listener.getClass()); + if (listener instanceof EntryExpiredListener) { + RTopic topic = RedissonTopic.createRaw(topicCodec, commandExecutor, getExpiredChannelName()); + return topic.addListenerAsync(List.class, new MessageListener>() { + @Override + public void onMessage(CharSequence channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K) msg.get(0), (V) msg.get(1), null); + ((EntryExpiredListener) listener).onExpired(event); + } + }); + } + + CompletableFuture res = new CompletableFuture<>(); + res.completeExceptionally(new IllegalArgumentException("Wrong listener type " + listener.getClass())); + return res; + }); + return new CompletableFutureWrapper<>(f); } @Override public void removeListener(int listenerId) { super.removeListener(listenerId); - - RTopic removedTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getRemovedChannelName()); + + RTopic removedTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getRemovedChannelName()); removedTopic.removeListener(listenerId); - RTopic createdTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getCreatedChannelName()); + RTopic createdTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getCreatedChannelName()); createdTopic.removeListener(listenerId); - RTopic updatedTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getUpdatedChannelName()); + RTopic updatedTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getUpdatedChannelName()); updatedTopic.removeListener(listenerId); - RTopic expiredTopic = RedissonTopic.createRaw(new MapCacheEventCodec(codec, osType), commandExecutor, getExpiredChannelName()); + RTopic expiredTopic = RedissonTopic.createRaw(topicCodec, commandExecutor, getExpiredChannelName()); expiredTopic.removeListener(listenerId); } diff --git a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java index 28ea917e5..3b4352cb2 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java @@ -17,6 +17,7 @@ package org.redisson.api; import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; +import org.redisson.api.map.event.MapEntryListener; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -305,5 +306,18 @@ public interface RMapCacheAsync extends RMapAsync { * -1 if the key exists but has no associated expire. */ RFuture remainTimeToLiveAsync(K key); - + + /** + * Adds map entry listener + * + * @see org.redisson.api.map.event.EntryCreatedListener + * @see org.redisson.api.map.event.EntryUpdatedListener + * @see org.redisson.api.map.event.EntryRemovedListener + * @see org.redisson.api.map.event.EntryExpiredListener + * + * @param listener - entry listener + * @return listener id + */ + RFuture addListenerAsync(MapEntryListener listener); + }