diff --git a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java index 95f4110d8..89468ecec 100644 --- a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java +++ b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java @@ -100,7 +100,7 @@ public class MapWriteBehindTask { if (options.getWriter() != null) { options.getWriter().write(addedMap); } else { - options.getWriterAsync().write(addedMap); + options.getWriterAsync().write(addedMap).toCompletableFuture().join(); } addedMap.clear(); } diff --git a/redisson/src/main/java/org/redisson/api/MapOptions.java b/redisson/src/main/java/org/redisson/api/MapOptions.java index 1bc6b12dd..689ee2926 100644 --- a/redisson/src/main/java/org/redisson/api/MapOptions.java +++ b/redisson/src/main/java/org/redisson/api/MapOptions.java @@ -19,6 +19,7 @@ import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapLoaderAsync; import org.redisson.api.map.MapWriter; import org.redisson.api.map.MapWriterAsync; +import org.redisson.api.map.RetryableMapWriter; import org.redisson.api.map.RetryableMapWriterAsync; import java.util.concurrent.TimeUnit; @@ -95,7 +96,7 @@ public class MapOptions { * @return MapOptions instance */ public MapOptions writer(MapWriter writer) { - this.writer = writer; + this.writer = new RetryableMapWriter<>(this, writer); return this; } public MapWriter getWriter() { @@ -170,7 +171,16 @@ public class MapOptions { return writerRetryAttempts; } + /** + * Sets max retry attempts for {@link RetryableMapWriter} or {@link RetryableMapWriterAsync} + * + * @param writerRetryAttempts object + * @return MapOptions instance + */ public MapOptions writerRetryAttempts(int writerRetryAttempts) { + if (writerRetryAttempts < 0){ + throw new IllegalArgumentException("writerRetryAttempts must be positive"); + } this.writerRetryAttempts = writerRetryAttempts; return this; } @@ -179,7 +189,17 @@ public class MapOptions { return writerRetryInterval; } + /** + * Sets retry interval for {@link RetryableMapWriter} or {@link RetryableMapWriterAsync} + * + * @param writerRetryInterval object + * @param timeUnit {@link TimeUnit} + * @return MapOptions instance + */ public MapOptions writerRetryInterval(long writerRetryInterval, TimeUnit timeUnit) { + if (writerRetryInterval < 0){ + throw new IllegalArgumentException("writerRetryInterval must be positive"); + } this.writerRetryInterval = timeUnit.toMillis(writerRetryInterval); return this; } diff --git a/redisson/src/main/java/org/redisson/api/map/RetryableMapWriter.java b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriter.java new file mode 100644 index 000000000..c62bca4db --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriter.java @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map; + +import org.redisson.api.MapOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class RetryableMapWriter implements MapWriter { + + private static final Logger log = LoggerFactory.getLogger(RetryableMapWriter.class); + + private final MapOptions options; + private final MapWriter mapWriter; + + public RetryableMapWriter(MapOptions options, MapWriter mapWriter) { + this.options = options; + this.mapWriter = mapWriter; + } + + @Override + public void write(Map addedMap) { + //execute at least once + int leftAddAttempts = Math.max(1, options.getWriterRetryAttempts()); + while (leftAddAttempts > 0) { + try { + //do write + mapWriter.write(addedMap); + break; + } catch (Exception exception) { + if (--leftAddAttempts == 0) { + throw exception; + } else { + log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception); + try { + Thread.sleep(options.getWriterRetryInterval()); + } catch (InterruptedException ignore) { + } + } + } + } + } + + @Override + public void delete(Collection keys) { + //execute at least once + int leftDeleteAttempts = Math.max(1, options.getWriterRetryAttempts()); + while (leftDeleteAttempts > 0) { + try { + //do delete + mapWriter.delete(keys); + break; + } catch (Exception exception) { + if (--leftDeleteAttempts == 0) { + throw exception; + } else { + log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), exception); + try { + Thread.sleep(options.getWriterRetryInterval()); + } catch (InterruptedException ignore) { + } + } + } + } + } +} diff --git a/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java index 50bfcbce2..ac9d4a681 100644 --- a/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java +++ b/redisson/src/main/java/org/redisson/api/map/RetryableMapWriterAsync.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; public class RetryableMapWriterAsync implements MapWriterAsync { @@ -32,9 +31,6 @@ public class RetryableMapWriterAsync implements MapWriterAsync { private final MapOptions options; private final MapWriterAsync mapWriterAsync; - //store entries no need to be retried - private final Map noRetriesForWrite = new ConcurrentHashMap<>(); - public RetryableMapWriterAsync(MapOptions options, MapWriterAsync mapWriterAsync) { this.options = options; this.mapWriterAsync = mapWriterAsync; @@ -42,37 +38,51 @@ public class RetryableMapWriterAsync implements MapWriterAsync { @Override public CompletionStage write(Map addedMap) { - //execute at least once - int leftAddAttempts = Math.max(1, options.getWriterRetryAttempts()); - while (leftAddAttempts > 0) { - try { - //remove successful part - if (!noRetriesForWrite.isEmpty()) { - noRetriesForWrite.forEach(addedMap::remove); - noRetriesForWrite.clear(); - } - - //do write - mapWriterAsync.write(addedMap).toCompletableFuture().join(); - break; - } catch (Exception exception) { - if (--leftAddAttempts == 0) { - throw exception; - } else { - log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception); - try { - Thread.sleep(options.getWriterRetryInterval()); - } catch (InterruptedException ignore) { + return CompletableFuture.runAsync(() -> { + //execute at least once + int leftAddAttempts = Math.max(1, options.getWriterRetryAttempts()); + while (leftAddAttempts > 0) { + try { + //do write + mapWriterAsync.write(addedMap).toCompletableFuture().join(); + break; + } catch (Exception exception) { + if (--leftAddAttempts == 0) { + throw exception; + } else { + log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception); + try { + Thread.sleep(options.getWriterRetryInterval()); + } catch (InterruptedException ignore) { + } } } } - } - - return CompletableFuture.completedFuture(null); + }); } @Override public CompletionStage delete(Collection keys) { - return mapWriterAsync.delete(keys); + return CompletableFuture.runAsync(() -> { + //execute at least once + int leftDeleteAttempts = Math.max(1, options.getWriterRetryAttempts()); + while (leftDeleteAttempts > 0) { + try { + //do delete + mapWriterAsync.delete(keys).toCompletableFuture().join(); + break; + } catch (Exception exception) { + if (--leftDeleteAttempts == 0) { + throw exception; + } else { + log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), exception); + try { + Thread.sleep(options.getWriterRetryInterval()); + } catch (InterruptedException ignore) { + } + } + } + } + }); } } diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index cd248e057..885d117ea 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -1442,7 +1442,7 @@ public abstract class BaseMapTest extends BaseTest { } @Test - public void testRetryableWriterSuccessAtLastRetry() throws InterruptedException { + public void testRetryableWriterAsyncSuccessAtLastRetry() throws InterruptedException { //success at last retry int expectedRetryAttempts = 3; AtomicInteger actualRetryTimes = new AtomicInteger(0); @@ -1457,13 +1457,17 @@ public abstract class BaseMapTest extends BaseTest { throw new IllegalStateException("retry"); } store.putAll(map); - //todo writeSuccess(map); }); } @Override public CompletionStage delete(Collection keys) { - return null; + return CompletableFuture.runAsync(()->{ + if (actualRetryTimes.incrementAndGet() < expectedRetryAttempts) { + throw new IllegalStateException("retry"); + } + keys.forEach(store::remove); + }); } }) .writeMode(MapOptions.WriteMode.WRITE_BEHIND) @@ -1471,61 +1475,90 @@ public abstract class BaseMapTest extends BaseTest { .writerRetryInterval(100, TimeUnit.MILLISECONDS); final RMap map = redisson.getMap("test", options); + //do add map.put("1", "11"); Thread.sleep(1400); - + + //assert add Map expectedMap = new HashMap<>(); expectedMap.put("1", "11"); assertThat(store).isEqualTo(expectedMap); - //assert retry times + //assert add retry times + assertThat(actualRetryTimes.get()).isEqualTo(expectedRetryAttempts); + + //do delete + actualRetryTimes.set(0); + map.remove("1"); + Thread.sleep(1400); + + //assert delete + expectedMap.clear(); + assertThat(store).isEqualTo(expectedMap); + + //assert delete retry times assertThat(actualRetryTimes.get()).isEqualTo(expectedRetryAttempts); destroy(map); } - /*@Test - public void testRetryableWriterOnlyRetryFailedPart() throws InterruptedException { - //lastWritingMap only contains the part that needs to be retried - Map lastWritingMap = new HashMap<>(); + @Test + public void testRetryableWriterSuccessAtLastRetry() throws InterruptedException { + //success at last retry + int expectedRetryAttempts = 3; + AtomicInteger actualRetryTimes = new AtomicInteger(0); + Map store = new HashMap<>(); MapOptions options = MapOptions.defaults() - .writerAsync(new MapWriterAsync() { + .writer(new MapWriter() { @Override - public CompletionStage write(Map writingMap) { - lastWritingMap.clear(); - lastWritingMap.putAll(writingMap); - - for (Entry entry : writingMap.entrySet()) { - if (entry.getKey().equals("illegalData")) { - throw new IllegalStateException("illegalData"); - } - //writeSuccess will exclude entry in next retry - //todo writeSuccess(entry); + public void write(Map map) { + if (actualRetryTimes.incrementAndGet() < expectedRetryAttempts) { + throw new IllegalStateException("retry"); } - return CompletableFuture.completedFuture(null); + store.putAll(map); } @Override - public CompletionStage delete(Collection keys) { - return null; + public void delete(Collection keys) { + if (actualRetryTimes.incrementAndGet() < expectedRetryAttempts) { + throw new IllegalStateException("retry"); + } + keys.forEach(store::remove); } }) .writeMode(MapOptions.WriteMode.WRITE_BEHIND) - .writerRetryAttempts(3); + .writerRetryAttempts(expectedRetryAttempts) + .writerRetryInterval(100, TimeUnit.MILLISECONDS); final RMap map = redisson.getMap("test", options); - map.put("22", "11"); - map.put("333", "11"); - map.put("illegalData", "11"); + + //do add + map.put("1", "11"); Thread.sleep(1400); - Map expectedLastWritingMap = new HashMap<>(); - expectedLastWritingMap.put("illegalData", "11"); - //finally, only "illegalData" still needs to be retried but the maximum number of retries is reached - assertThat(lastWritingMap).isEqualTo(expectedLastWritingMap); + //assert add + Map expectedMap = new HashMap<>(); + expectedMap.put("1", "11"); + assertThat(store).isEqualTo(expectedMap); + //assert add retry times + assertThat(actualRetryTimes.get()).isEqualTo(expectedRetryAttempts); + + + //do delete + actualRetryTimes.set(0); + map.remove("1"); + Thread.sleep(1400); + + //assert delete + expectedMap.clear(); + assertThat(store).isEqualTo(expectedMap); + + //assert delete retry times + assertThat(actualRetryTimes.get()).isEqualTo(expectedRetryAttempts); destroy(map); - }*/ - + } + + @Test public void testLoadAllReplaceValues() { Map cache = new HashMap<>();