Support retry for WRITE_THROUGH and WRITE_BEHIND: use schedule instead of sleep

Signed-off-by: zzhlhc <zhouzh_zzz@qq.com>
pull/5294/head
zzhlhc 1 year ago
parent afa91e25be
commit 5e56c73956

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.MapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.api.map.RetryableMapWriterAsync;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,7 +101,11 @@ public class MapWriteBehindTask {
if (options.getWriter() != null) {
options.getWriter().write(addedMap);
} else {
options.getWriterAsync().write(addedMap).toCompletableFuture().join();
((RetryableMapWriterAsync<Object, Object>) options.getWriterAsync())
.withRetryManager(commandExecutor.getServiceManager())
.write(addedMap)
.toCompletableFuture()
.join();
}
addedMap.clear();
}

@ -16,21 +16,28 @@
package org.redisson.api.map;
import org.redisson.api.MapOptions;
import org.redisson.connection.ServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class RetryableMapWriterAsync<K, V> implements MapWriterAsync<K, V> {
private static final Logger log = LoggerFactory.getLogger(RetryableMapWriterAsync.class);
private final MapOptions<K, V> options;
private final MapWriterAsync<K, V> mapWriterAsync;
private ServiceManager serviceManager;
public RetryableMapWriterAsync(MapOptions<K, V> options, MapWriterAsync<K, V> mapWriterAsync) {
this.options = options;
this.mapWriterAsync = mapWriterAsync;
@ -38,24 +45,24 @@ public class RetryableMapWriterAsync<K, V> implements MapWriterAsync<K, V> {
@Override
public CompletionStage<Void> write(Map<K, V> addedMap) {
return retryWrite(Math.max(1, options.getWriterRetryAttempts()), new LinkedHashMap<>(addedMap));
}
private CompletableFuture<Void> retryWrite(int leftAttempts, Map<K, V> addedMap) {
return CompletableFuture.runAsync(() -> {
//execute at least once
int leftAddAttempts = Math.max(1, options.getWriterRetryAttempts());
while (leftAddAttempts > 0) {
try {
//do write
mapWriterAsync.write(addedMap).toCompletableFuture().join();
break;
} catch (Exception exception) {
if (--leftAddAttempts == 0) {
throw exception;
} else {
log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception);
try {
Thread.sleep(options.getWriterRetryInterval());
} catch (InterruptedException ignore) {
}
}
try {
//do write
mapWriterAsync.write(addedMap).toCompletableFuture().join();
} catch (Exception exception) {
if (leftAttempts - 1 == 0) {
throw exception;
} else {
//only need serviceManager when exception happened
Objects.requireNonNull(serviceManager);
log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception);
serviceManager.newTimeout(t -> retryWrite(leftAttempts - 1, addedMap).toCompletableFuture().join()
, options.getWriterRetryInterval()
, TimeUnit.MILLISECONDS);
}
}
});
@ -85,4 +92,11 @@ public class RetryableMapWriterAsync<K, V> implements MapWriterAsync<K, V> {
}
});
}
public RetryableMapWriterAsync<K, V> withRetryManager(ServiceManager serviceManager) {
if (this.serviceManager == null) {
this.serviceManager = serviceManager;
}
return this;
}
}

Loading…
Cancel
Save