From 1585c3386632e4c078513e6347e8e4e46d19a1f1 Mon Sep 17 00:00:00 2001 From: zzhlhc Date: Sun, 24 Sep 2023 20:17:44 +0800 Subject: [PATCH] Support retry for WRITE_THROUGH and WRITE_BEHIND: avoid synchronization Signed-off-by: zzhlhc --- .../java/org/redisson/MapWriteBehindTask.java | 7 +- .../main/java/org/redisson/RedissonMap.java | 11 +++ .../api/map/RetryableMapWriterAsync.java | 98 +++++++++++-------- .../test/java/org/redisson/BaseMapTest.java | 4 +- 4 files changed, 69 insertions(+), 51 deletions(-) diff --git a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java index bb58eda7e..89468ecec 100644 --- a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java +++ b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.api.MapOptions; import org.redisson.api.RFuture; import org.redisson.api.RQueue; -import org.redisson.api.map.RetryableMapWriterAsync; import org.redisson.command.CommandAsyncExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,11 +100,7 @@ public class MapWriteBehindTask { if (options.getWriter() != null) { options.getWriter().write(addedMap); } else { - ((RetryableMapWriterAsync) options.getWriterAsync()) - .withRetryManager(commandExecutor.getServiceManager()) - .write(addedMap) - .toCompletableFuture() - .join(); + options.getWriterAsync().write(addedMap).toCompletableFuture().join(); } addedMap.clear(); } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index a43fa87d0..400fb93fb 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -21,6 +21,7 @@ import org.redisson.api.*; import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.listener.MapPutListener; import org.redisson.api.listener.MapRemoveListener; +import org.redisson.api.map.RetryableMapWriterAsync; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -84,6 +85,11 @@ public class RedissonMap extends RedissonExpirable implements RMap { this.writeBehindService = null; writeBehindTask = null; } + if (options != null + && options.getWriterRetryAttempts()>1 + && options.getWriterAsync() != null){ + ((RetryableMapWriterAsync) options.getWriterAsync()).setServiceManager(commandExecutor.getServiceManager()); + } } public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name) { @@ -108,6 +114,11 @@ public class RedissonMap extends RedissonExpirable implements RMap { this.writeBehindService = null; writeBehindTask = null; } + if (options != null + && options.getWriterRetryAttempts()>1 + && options.getWriterAsync() != null){ + ((RetryableMapWriterAsync) options.getWriterAsync()).setServiceManager(commandExecutor.getServiceManager()); + } } @Override diff --git a/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java index fad10e143..2ec3a8ddf 100644 --- a/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java +++ b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java @@ -21,9 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -38,6 +36,10 @@ public class RetryableMapWriterAsync implements MapWriterAsync { private ServiceManager serviceManager; + public void setServiceManager(ServiceManager serviceManager) { + this.serviceManager = serviceManager; + } + public RetryableMapWriterAsync(MapOptions options, MapWriterAsync mapWriterAsync) { this.options = options; this.mapWriterAsync = mapWriterAsync; @@ -45,57 +47,67 @@ public class RetryableMapWriterAsync implements MapWriterAsync { @Override public CompletionStage write(Map addedMap) { - return retryWrite(Math.max(1, options.getWriterRetryAttempts()), new LinkedHashMap<>(addedMap)); + CompletableFuture result = new CompletableFuture<>(); + retryWrite(Math.max(1, options.getWriterRetryAttempts()), addedMap, result); + return result; } - private CompletableFuture retryWrite(int leftAttempts, Map addedMap) { - return CompletableFuture.runAsync(() -> { - try { - //do write - mapWriterAsync.write(addedMap).toCompletableFuture().join(); - } catch (Exception exception) { - if (leftAttempts - 1 == 0) { - throw exception; - } else { - //only need serviceManager when exception happened - Objects.requireNonNull(serviceManager); - log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception); - serviceManager.newTimeout(t -> retryWrite(leftAttempts - 1, addedMap).toCompletableFuture().join(), + private void retryWrite(int leftAttempts, Map addedMap, CompletableFuture result) { + mapWriterAsync.write(addedMap).whenComplete((x, e) -> { + if (e == null) { + result.complete(null); + return; + } + + if (leftAttempts - 1 <= 0) { + result.completeExceptionally(e); + return; + } + + if (serviceManager == null) { + log.warn("The serviceManager is null so cannot retry writing keys: {}", addedMap); + result.completeExceptionally(e); + return; + } + + log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), e); + serviceManager.newTimeout(t -> retryWrite(leftAttempts - 1, addedMap, result), options.getWriterRetryInterval(), TimeUnit.MILLISECONDS); } - } - }); + ); } @Override public CompletionStage delete(Collection keys) { - return CompletableFuture.runAsync(() -> { - //execute at least once - int leftDeleteAttempts = Math.max(1, options.getWriterRetryAttempts()); - while (leftDeleteAttempts > 0) { - try { - //do delete - mapWriterAsync.delete(keys).toCompletableFuture().join(); - break; - } catch (Exception exception) { - if (--leftDeleteAttempts == 0) { - throw exception; - } else { - log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), exception); - try { - Thread.sleep(options.getWriterRetryInterval()); - } catch (InterruptedException ignore) { - } + CompletableFuture result = new CompletableFuture<>(); + retryDelete(Math.max(1, options.getWriterRetryAttempts()), keys, result); + return result; + } + + private void retryDelete(int leftAttempts, Collection keys, CompletableFuture result) { + mapWriterAsync.delete(keys).whenComplete((x, e) -> { + if (e == null) { + result.complete(null); + return; + } + + if (leftAttempts - 1 <= 0) { + result.completeExceptionally(e); + return; + } + + if (serviceManager == null) { + log.warn("The serviceManager is null so cannot retry deleting keys: {}", keys); + result.completeExceptionally(e); + return; } + + log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), e); + serviceManager.newTimeout(t -> retryDelete(leftAttempts - 1, keys, result), + options.getWriterRetryInterval(), TimeUnit.MILLISECONDS); } - } - }); + ); } - public RetryableMapWriterAsync withRetryManager(ServiceManager serviceManager) { - if (this.serviceManager == null) { - this.serviceManager = serviceManager; - } - return this; - } + } diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index dbaaef4b9..26be4b8d3 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -1477,7 +1477,7 @@ public abstract class BaseMapTest extends BaseTest { final RMap map = redisson.getMap("test", options); //do add map.put("1", "11"); - Thread.sleep(1400); + Thread.sleep(2400); //assert add Map expectedMap = new HashMap<>(); @@ -1490,7 +1490,7 @@ public abstract class BaseMapTest extends BaseTest { //do delete actualRetryTimes.set(0); map.remove("1"); - Thread.sleep(1400); + Thread.sleep(2400); //assert delete expectedMap.clear();