From 42f27d89bc5096fce7fb4483121b0d5f5d49123f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 22 Nov 2021 05:28:09 +0300 Subject: [PATCH] Fixed - RLocalCachedMap.delete() method clears local cache asynchronously. #3966 --- .../org/redisson/RedissonLocalCachedMap.java | 2 +- .../redisson/cache/LocalCacheListener.java | 15 +- .../redisson/cache/LocalCachedMapClear.java | 8 +- .../cache/LocalCachedMessageCodec.java | 276 +++++++++--------- .../redisson/RedissonLocalCachedMapTest.java | 15 + 5 files changed, 168 insertions(+), 148 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 873b5c2e2..f2be1ce57 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -598,7 +598,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture deleteAsync() { cache.clear(); - ByteBuf msgEncoded = encode(new LocalCachedMapClear(listener.generateId(), false)); + ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, listener.generateId(), false)); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then " + "redis.call('publish', KEYS[2], ARGV[1]); " diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index 5a449f5f1..c56f5fa2b 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -188,11 +188,13 @@ public abstract class LocalCacheListener { if (msg instanceof LocalCachedMapClear) { LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg; - cache.clear(); + if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) { + cache.clear(); - if (clearMsg.isReleaseSemaphore()) { - RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId()); - semaphore.releaseAsync(); + if (clearMsg.isReleaseSemaphore()) { + RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId()); + semaphore.releaseAsync(); + } } } @@ -249,8 +251,9 @@ public abstract class LocalCacheListener { public RFuture clearLocalCacheAsync() { RPromise result = new RedissonPromise(); + cache.clear(); byte[] id = generateId(); - RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear(id, true)); + RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true)); future.onComplete((res, e) -> { if (e != null) { result.tryFailure(e); @@ -258,7 +261,7 @@ public abstract class LocalCacheListener { } RSemaphore semaphore = getClearSemaphore(id); - semaphore.tryAcquireAsync(res.intValue(), 50, TimeUnit.SECONDS).onComplete((r, ex) -> { + semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS).onComplete((r, ex) -> { if (ex != null) { result.tryFailure(ex); return; diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMapClear.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMapClear.java index e6081811a..2f08e61d5 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMapClear.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMapClear.java @@ -25,17 +25,23 @@ import java.io.Serializable; @SuppressWarnings("serial") public class LocalCachedMapClear implements Serializable { + private byte[] excludedId; private byte[] requestId; private boolean releaseSemaphore; public LocalCachedMapClear() { } - public LocalCachedMapClear(byte[] requestId, boolean releaseSemaphore) { + public LocalCachedMapClear(byte[] excludedId, byte[] requestId, boolean releaseSemaphore) { + this.excludedId = excludedId; this.requestId = requestId; this.releaseSemaphore = releaseSemaphore; } + public byte[] getExcludedId() { + return excludedId; + } + public boolean isReleaseSemaphore() { return releaseSemaphore; } diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java index 8d18a4a8a..28a946d74 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java @@ -36,162 +36,158 @@ public class LocalCachedMessageCodec extends BaseCodec { public static final LocalCachedMessageCodec INSTANCE = new LocalCachedMessageCodec(); - private final Decoder decoder = new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - byte type = buf.readByte(); - if (type == 0x0) { - byte[] id = new byte[16]; - buf.readBytes(id); - boolean releaseSemaphore = buf.readBoolean(); - return new LocalCachedMapClear(id, releaseSemaphore); - } - - if (type == 0x1) { - byte[] excludedId = new byte[16]; - buf.readBytes(excludedId); - int hashesCount = buf.readInt(); - byte[][] hashes = new byte[hashesCount][]; - for (int i = 0; i < hashesCount; i++) { - byte[] keyHash = new byte[16]; - buf.readBytes(keyHash); - hashes[i] = keyHash; - } - return new LocalCachedMapInvalidate(excludedId, hashes); - } - - if (type == 0x2) { - byte[] excludedId = new byte[16]; - buf.readBytes(excludedId); - List entries = new ArrayList(); - while (true) { - int keyLen = buf.readInt(); - byte[] key = new byte[keyLen]; - buf.readBytes(key); - int valueLen = buf.readInt(); - byte[] value = new byte[valueLen]; - buf.readBytes(value); - entries.add(new LocalCachedMapUpdate.Entry(key, value)); - - if (!buf.isReadable()) { - break; - } - } - return new LocalCachedMapUpdate(excludedId, entries); + private final Decoder decoder = (buf, state) -> { + byte type = buf.readByte(); + if (type == 0x0) { + byte[] excludedId = new byte[16]; + buf.readBytes(excludedId); + byte[] id = new byte[16]; + buf.readBytes(id); + boolean releaseSemaphore = buf.readBoolean(); + return new LocalCachedMapClear(excludedId, id, releaseSemaphore); + } + + if (type == 0x1) { + byte[] excludedId = new byte[16]; + buf.readBytes(excludedId); + int hashesCount = buf.readInt(); + byte[][] hashes = new byte[hashesCount][]; + for (int i = 0; i < hashesCount; i++) { + byte[] keyHash = new byte[16]; + buf.readBytes(keyHash); + hashes[i] = keyHash; } - - if (type == 0x3) { - byte len = buf.readByte(); - CharSequence requestId = buf.readCharSequence(len, CharsetUtil.US_ASCII); - long timeout = buf.readLong(); - int hashesCount = buf.readInt(); - byte[][] hashes = new byte[hashesCount][]; - for (int i = 0; i < hashesCount; i++) { - byte[] keyHash = new byte[16]; - buf.readBytes(keyHash); - hashes[i] = keyHash; + return new LocalCachedMapInvalidate(excludedId, hashes); + } + + if (type == 0x2) { + byte[] excludedId = new byte[16]; + buf.readBytes(excludedId); + List entries = new ArrayList(); + while (true) { + int keyLen = buf.readInt(); + byte[] key = new byte[keyLen]; + buf.readBytes(key); + int valueLen = buf.readInt(); + byte[] value = new byte[valueLen]; + buf.readBytes(value); + entries.add(new LocalCachedMapUpdate.Entry(key, value)); + + if (!buf.isReadable()) { + break; } - return new LocalCachedMapDisable(requestId.toString(), hashes, timeout); - } - - if (type == 0x4) { - return new LocalCachedMapDisableAck(); } + return new LocalCachedMapUpdate(excludedId, entries); + } - if (type == 0x5) { - byte len = buf.readByte(); - CharSequence requestId = buf.readCharSequence(len, CharsetUtil.UTF_8); - int hashesCount = buf.readInt(); - byte[][] hashes = new byte[hashesCount][]; - for (int i = 0; i < hashesCount; i++) { - byte[] keyHash = new byte[16]; - buf.readBytes(keyHash); - hashes[i] = keyHash; - } - return new LocalCachedMapEnable(requestId.toString(), hashes); + if (type == 0x3) { + byte len = buf.readByte(); + CharSequence requestId = buf.readCharSequence(len, CharsetUtil.US_ASCII); + long timeout = buf.readLong(); + int hashesCount = buf.readInt(); + byte[][] hashes = new byte[hashesCount][]; + for (int i = 0; i < hashesCount; i++) { + byte[] keyHash = new byte[16]; + buf.readBytes(keyHash); + hashes[i] = keyHash; } + return new LocalCachedMapDisable(requestId.toString(), hashes, timeout); + } - throw new IllegalArgumentException("Can't parse packet"); + if (type == 0x4) { + return new LocalCachedMapDisableAck(); } + + if (type == 0x5) { + byte len = buf.readByte(); + CharSequence requestId = buf.readCharSequence(len, CharsetUtil.UTF_8); + int hashesCount = buf.readInt(); + byte[][] hashes = new byte[hashesCount][]; + for (int i = 0; i < hashesCount; i++) { + byte[] keyHash = new byte[16]; + buf.readBytes(keyHash); + hashes[i] = keyHash; + } + return new LocalCachedMapEnable(requestId.toString(), hashes); + } + + throw new IllegalArgumentException("Can't parse packet"); }; - private final Encoder encoder = new Encoder() { - - @Override - public ByteBuf encode(Object in) throws IOException { - if (in instanceof LocalCachedMapClear) { - LocalCachedMapClear li = (LocalCachedMapClear) in; - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); - result.writeByte(0x0); - result.writeBytes(li.getRequestId()); - result.writeBoolean(li.isReleaseSemaphore()); - return result; - } - if (in instanceof LocalCachedMapInvalidate) { - LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in; - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); - result.writeByte(0x1); - - result.writeBytes(li.getExcludedId()); - result.writeInt(li.getKeyHashes().length); - for (int i = 0; i < li.getKeyHashes().length; i++) { - result.writeBytes(li.getKeyHashes()[i]); - } - return result; - } - - if (in instanceof LocalCachedMapUpdate) { - LocalCachedMapUpdate li = (LocalCachedMapUpdate) in; - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); - result.writeByte(0x2); - - result.writeBytes(li.getExcludedId()); - for (LocalCachedMapUpdate.Entry e : li.getEntries()) { - result.writeInt(e.getKey().length); - result.writeBytes(e.getKey()); - result.writeInt(e.getValue().length); - result.writeBytes(e.getValue()); - } - return result; - } - - if (in instanceof LocalCachedMapDisable) { - LocalCachedMapDisable li = (LocalCachedMapDisable) in; - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); - result.writeByte(0x3); - - result.writeByte(li.getRequestId().length()); - result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8); - result.writeLong(li.getTimeout()); - result.writeInt(li.getKeyHashes().length); - for (int i = 0; i < li.getKeyHashes().length; i++) { - result.writeBytes(li.getKeyHashes()[i]); - } - return result; + private final Encoder encoder = in -> { + if (in instanceof LocalCachedMapClear) { + LocalCachedMapClear li = (LocalCachedMapClear) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); + result.writeByte(0x0); + result.writeBytes(li.getExcludedId()); + result.writeBytes(li.getRequestId()); + result.writeBoolean(li.isReleaseSemaphore()); + return result; + } + if (in instanceof LocalCachedMapInvalidate) { + LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); + result.writeByte(0x1); + + result.writeBytes(li.getExcludedId()); + result.writeInt(li.getKeyHashes().length); + for (int i = 0; i < li.getKeyHashes().length; i++) { + result.writeBytes(li.getKeyHashes()[i]); } + return result; + } - if (in instanceof LocalCachedMapDisableAck) { - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); - result.writeByte(0x4); - return result; + if (in instanceof LocalCachedMapUpdate) { + LocalCachedMapUpdate li = (LocalCachedMapUpdate) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); + result.writeByte(0x2); + + result.writeBytes(li.getExcludedId()); + for (LocalCachedMapUpdate.Entry e : li.getEntries()) { + result.writeInt(e.getKey().length); + result.writeBytes(e.getKey()); + result.writeInt(e.getValue().length); + result.writeBytes(e.getValue()); } + return result; + } - if (in instanceof LocalCachedMapEnable) { - LocalCachedMapEnable li = (LocalCachedMapEnable) in; - ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); - result.writeByte(0x5); - - result.writeByte(li.getRequestId().length()); - result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8); - result.writeInt(li.getKeyHashes().length); - for (int i = 0; i < li.getKeyHashes().length; i++) { - result.writeBytes(li.getKeyHashes()[i]); - } - return result; + if (in instanceof LocalCachedMapDisable) { + LocalCachedMapDisable li = (LocalCachedMapDisable) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); + result.writeByte(0x3); + + result.writeByte(li.getRequestId().length()); + result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8); + result.writeLong(li.getTimeout()); + result.writeInt(li.getKeyHashes().length); + for (int i = 0; i < li.getKeyHashes().length; i++) { + result.writeBytes(li.getKeyHashes()[i]); } + return result; + } + + if (in instanceof LocalCachedMapDisableAck) { + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); + result.writeByte(0x4); + return result; + } + + if (in instanceof LocalCachedMapEnable) { + LocalCachedMapEnable li = (LocalCachedMapEnable) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); + result.writeByte(0x5); - throw new IllegalArgumentException("Can't encode packet " + in); + result.writeByte(li.getRequestId().length()); + result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8); + result.writeInt(li.getKeyHashes().length); + for (int i = 0; i < li.getKeyHashes().length; i++) { + result.writeBytes(li.getKeyHashes()[i]); + } + return result; } + + throw new IllegalArgumentException("Can't encode packet " + in); }; diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 81d0e2575..fad795098 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -108,6 +108,21 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { assertThat(cachedMap.containsKey("a")).isFalse(); } + @Test + public void testPutAfterDelete() { + RMap map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); + + for (int i = 0; i < 1_000; i++) { + map.delete(); + + map.put("key", "val1"); + map.get("key"); + map.put("key", "val2"); + String val = map.get("key"); + assertThat(val).isEqualTo("val2"); + } + } + @Test public void testMapLoaderGet() { Map cache = new HashMap<>();