Fixed - RLocalCachedMap.delete() method clears local cache asynchronously. #3966

pull/3977/head
Nikita Koksharov 3 years ago
parent 3e4c12cfe4
commit 42f27d89bc

@ -598,7 +598,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public RFuture<Boolean> deleteAsync() { public RFuture<Boolean> deleteAsync() {
cache.clear(); cache.clear();
ByteBuf msgEncoded = encode(new LocalCachedMapClear(listener.generateId(), false)); ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, listener.generateId(), false));
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then " "if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); "

@ -188,11 +188,13 @@ public abstract class LocalCacheListener {
if (msg instanceof LocalCachedMapClear) { if (msg instanceof LocalCachedMapClear) {
LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg; LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg;
cache.clear(); if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) {
cache.clear();
if (clearMsg.isReleaseSemaphore()) { if (clearMsg.isReleaseSemaphore()) {
RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId()); RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId());
semaphore.releaseAsync(); semaphore.releaseAsync();
}
} }
} }
@ -249,8 +251,9 @@ public abstract class LocalCacheListener {
public RFuture<Void> clearLocalCacheAsync() { public RFuture<Void> clearLocalCacheAsync() {
RPromise<Void> result = new RedissonPromise<Void>(); RPromise<Void> result = new RedissonPromise<Void>();
cache.clear();
byte[] id = generateId(); byte[] id = generateId();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear(id, true)); RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
if (e != null) { if (e != null) {
result.tryFailure(e); result.tryFailure(e);
@ -258,7 +261,7 @@ public abstract class LocalCacheListener {
} }
RSemaphore semaphore = getClearSemaphore(id); RSemaphore semaphore = getClearSemaphore(id);
semaphore.tryAcquireAsync(res.intValue(), 50, TimeUnit.SECONDS).onComplete((r, ex) -> { semaphore.tryAcquireAsync(res.intValue() - 1, 50, TimeUnit.SECONDS).onComplete((r, ex) -> {
if (ex != null) { if (ex != null) {
result.tryFailure(ex); result.tryFailure(ex);
return; return;

@ -25,17 +25,23 @@ import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class LocalCachedMapClear implements Serializable { public class LocalCachedMapClear implements Serializable {
private byte[] excludedId;
private byte[] requestId; private byte[] requestId;
private boolean releaseSemaphore; private boolean releaseSemaphore;
public LocalCachedMapClear() { public LocalCachedMapClear() {
} }
public LocalCachedMapClear(byte[] requestId, boolean releaseSemaphore) { public LocalCachedMapClear(byte[] excludedId, byte[] requestId, boolean releaseSemaphore) {
this.excludedId = excludedId;
this.requestId = requestId; this.requestId = requestId;
this.releaseSemaphore = releaseSemaphore; this.releaseSemaphore = releaseSemaphore;
} }
public byte[] getExcludedId() {
return excludedId;
}
public boolean isReleaseSemaphore() { public boolean isReleaseSemaphore() {
return releaseSemaphore; return releaseSemaphore;
} }

@ -36,162 +36,158 @@ public class LocalCachedMessageCodec extends BaseCodec {
public static final LocalCachedMessageCodec INSTANCE = new LocalCachedMessageCodec(); public static final LocalCachedMessageCodec INSTANCE = new LocalCachedMessageCodec();
private final Decoder<Object> decoder = new Decoder<Object>() { private final Decoder<Object> decoder = (buf, state) -> {
@Override byte type = buf.readByte();
public Object decode(ByteBuf buf, State state) throws IOException { if (type == 0x0) {
byte type = buf.readByte(); byte[] excludedId = new byte[16];
if (type == 0x0) { buf.readBytes(excludedId);
byte[] id = new byte[16]; byte[] id = new byte[16];
buf.readBytes(id); buf.readBytes(id);
boolean releaseSemaphore = buf.readBoolean(); boolean releaseSemaphore = buf.readBoolean();
return new LocalCachedMapClear(id, releaseSemaphore); return new LocalCachedMapClear(excludedId, id, releaseSemaphore);
} }
if (type == 0x1) { if (type == 0x1) {
byte[] excludedId = new byte[16]; byte[] excludedId = new byte[16];
buf.readBytes(excludedId); buf.readBytes(excludedId);
int hashesCount = buf.readInt(); int hashesCount = buf.readInt();
byte[][] hashes = new byte[hashesCount][]; byte[][] hashes = new byte[hashesCount][];
for (int i = 0; i < hashesCount; i++) { for (int i = 0; i < hashesCount; i++) {
byte[] keyHash = new byte[16]; byte[] keyHash = new byte[16];
buf.readBytes(keyHash); buf.readBytes(keyHash);
hashes[i] = keyHash; hashes[i] = keyHash;
}
return new LocalCachedMapInvalidate(excludedId, hashes);
}
if (type == 0x2) {
byte[] excludedId = new byte[16];
buf.readBytes(excludedId);
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
while (true) {
int keyLen = buf.readInt();
byte[] key = new byte[keyLen];
buf.readBytes(key);
int valueLen = buf.readInt();
byte[] value = new byte[valueLen];
buf.readBytes(value);
entries.add(new LocalCachedMapUpdate.Entry(key, value));
if (!buf.isReadable()) {
break;
}
}
return new LocalCachedMapUpdate(excludedId, entries);
} }
return new LocalCachedMapInvalidate(excludedId, hashes);
if (type == 0x3) { }
byte len = buf.readByte();
CharSequence requestId = buf.readCharSequence(len, CharsetUtil.US_ASCII); if (type == 0x2) {
long timeout = buf.readLong(); byte[] excludedId = new byte[16];
int hashesCount = buf.readInt(); buf.readBytes(excludedId);
byte[][] hashes = new byte[hashesCount][]; List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
for (int i = 0; i < hashesCount; i++) { while (true) {
byte[] keyHash = new byte[16]; int keyLen = buf.readInt();
buf.readBytes(keyHash); byte[] key = new byte[keyLen];
hashes[i] = keyHash; buf.readBytes(key);
int valueLen = buf.readInt();
byte[] value = new byte[valueLen];
buf.readBytes(value);
entries.add(new LocalCachedMapUpdate.Entry(key, value));
if (!buf.isReadable()) {
break;
} }
return new LocalCachedMapDisable(requestId.toString(), hashes, timeout);
}
if (type == 0x4) {
return new LocalCachedMapDisableAck();
} }
return new LocalCachedMapUpdate(excludedId, entries);
}
if (type == 0x5) { if (type == 0x3) {
byte len = buf.readByte(); byte len = buf.readByte();
CharSequence requestId = buf.readCharSequence(len, CharsetUtil.UTF_8); CharSequence requestId = buf.readCharSequence(len, CharsetUtil.US_ASCII);
int hashesCount = buf.readInt(); long timeout = buf.readLong();
byte[][] hashes = new byte[hashesCount][]; int hashesCount = buf.readInt();
for (int i = 0; i < hashesCount; i++) { byte[][] hashes = new byte[hashesCount][];
byte[] keyHash = new byte[16]; for (int i = 0; i < hashesCount; i++) {
buf.readBytes(keyHash); byte[] keyHash = new byte[16];
hashes[i] = keyHash; buf.readBytes(keyHash);
} hashes[i] = keyHash;
return new LocalCachedMapEnable(requestId.toString(), hashes);
} }
return new LocalCachedMapDisable(requestId.toString(), hashes, timeout);
}
throw new IllegalArgumentException("Can't parse packet"); if (type == 0x4) {
return new LocalCachedMapDisableAck();
} }
if (type == 0x5) {
byte len = buf.readByte();
CharSequence requestId = buf.readCharSequence(len, CharsetUtil.UTF_8);
int hashesCount = buf.readInt();
byte[][] hashes = new byte[hashesCount][];
for (int i = 0; i < hashesCount; i++) {
byte[] keyHash = new byte[16];
buf.readBytes(keyHash);
hashes[i] = keyHash;
}
return new LocalCachedMapEnable(requestId.toString(), hashes);
}
throw new IllegalArgumentException("Can't parse packet");
}; };
private final Encoder encoder = new Encoder() { private final Encoder encoder = in -> {
if (in instanceof LocalCachedMapClear) {
@Override LocalCachedMapClear li = (LocalCachedMapClear) in;
public ByteBuf encode(Object in) throws IOException { ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1);
if (in instanceof LocalCachedMapClear) { result.writeByte(0x0);
LocalCachedMapClear li = (LocalCachedMapClear) in; result.writeBytes(li.getExcludedId());
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); result.writeBytes(li.getRequestId());
result.writeByte(0x0); result.writeBoolean(li.isReleaseSemaphore());
result.writeBytes(li.getRequestId()); return result;
result.writeBoolean(li.isReleaseSemaphore()); }
return result; if (in instanceof LocalCachedMapInvalidate) {
} LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in;
if (in instanceof LocalCachedMapInvalidate) { ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in; result.writeByte(0x1);
ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x1); result.writeBytes(li.getExcludedId());
result.writeInt(li.getKeyHashes().length);
result.writeBytes(li.getExcludedId()); for (int i = 0; i < li.getKeyHashes().length; i++) {
result.writeInt(li.getKeyHashes().length); result.writeBytes(li.getKeyHashes()[i]);
for (int i = 0; i < li.getKeyHashes().length; i++) {
result.writeBytes(li.getKeyHashes()[i]);
}
return result;
}
if (in instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate li = (LocalCachedMapUpdate) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x2);
result.writeBytes(li.getExcludedId());
for (LocalCachedMapUpdate.Entry e : li.getEntries()) {
result.writeInt(e.getKey().length);
result.writeBytes(e.getKey());
result.writeInt(e.getValue().length);
result.writeBytes(e.getValue());
}
return result;
}
if (in instanceof LocalCachedMapDisable) {
LocalCachedMapDisable li = (LocalCachedMapDisable) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x3);
result.writeByte(li.getRequestId().length());
result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8);
result.writeLong(li.getTimeout());
result.writeInt(li.getKeyHashes().length);
for (int i = 0; i < li.getKeyHashes().length; i++) {
result.writeBytes(li.getKeyHashes()[i]);
}
return result;
} }
return result;
}
if (in instanceof LocalCachedMapDisableAck) { if (in instanceof LocalCachedMapUpdate) {
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); LocalCachedMapUpdate li = (LocalCachedMapUpdate) in;
result.writeByte(0x4); ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
return result; result.writeByte(0x2);
result.writeBytes(li.getExcludedId());
for (LocalCachedMapUpdate.Entry e : li.getEntries()) {
result.writeInt(e.getKey().length);
result.writeBytes(e.getKey());
result.writeInt(e.getValue().length);
result.writeBytes(e.getValue());
} }
return result;
}
if (in instanceof LocalCachedMapEnable) { if (in instanceof LocalCachedMapDisable) {
LocalCachedMapEnable li = (LocalCachedMapEnable) in; LocalCachedMapDisable li = (LocalCachedMapDisable) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x5); result.writeByte(0x3);
result.writeByte(li.getRequestId().length()); result.writeByte(li.getRequestId().length());
result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8); result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8);
result.writeInt(li.getKeyHashes().length); result.writeLong(li.getTimeout());
for (int i = 0; i < li.getKeyHashes().length; i++) { result.writeInt(li.getKeyHashes().length);
result.writeBytes(li.getKeyHashes()[i]); for (int i = 0; i < li.getKeyHashes().length; i++) {
} result.writeBytes(li.getKeyHashes()[i]);
return result;
} }
return result;
}
if (in instanceof LocalCachedMapDisableAck) {
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1);
result.writeByte(0x4);
return result;
}
if (in instanceof LocalCachedMapEnable) {
LocalCachedMapEnable li = (LocalCachedMapEnable) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x5);
throw new IllegalArgumentException("Can't encode packet " + in); result.writeByte(li.getRequestId().length());
result.writeCharSequence(li.getRequestId(), CharsetUtil.UTF_8);
result.writeInt(li.getKeyHashes().length);
for (int i = 0; i < li.getKeyHashes().length; i++) {
result.writeBytes(li.getKeyHashes()[i]);
}
return result;
} }
throw new IllegalArgumentException("Can't encode packet " + in);
}; };

@ -108,6 +108,21 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cachedMap.containsKey("a")).isFalse(); assertThat(cachedMap.containsKey("a")).isFalse();
} }
@Test
public void testPutAfterDelete() {
RMap<String, String> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
for (int i = 0; i < 1_000; i++) {
map.delete();
map.put("key", "val1");
map.get("key");
map.put("key", "val2");
String val = map.get("key");
assertThat(val).isEqualTo("val2");
}
}
@Test @Test
public void testMapLoaderGet() { public void testMapLoaderGet() {
Map<String, String> cache = new HashMap<>(); Map<String, String> cache = new HashMap<>();

Loading…
Cancel
Save