Support retry for WRITE_THROUGH and WRITE_BEHIND: avoid synchronization

Signed-off-by: zzhlhc <zhouzh_zzz@qq.com>
pull/5294/head
zzhlhc 1 year ago
parent 1b7556a353
commit 1585c33866

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.MapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.api.map.RetryableMapWriterAsync;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -101,11 +100,7 @@ public class MapWriteBehindTask {
if (options.getWriter() != null) {
options.getWriter().write(addedMap);
} else {
((RetryableMapWriterAsync<Object, Object>) options.getWriterAsync())
.withRetryManager(commandExecutor.getServiceManager())
.write(addedMap)
.toCompletableFuture()
.join();
options.getWriterAsync().write(addedMap).toCompletableFuture().join();
}
addedMap.clear();
}

@ -21,6 +21,7 @@ import org.redisson.api.*;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.listener.MapPutListener;
import org.redisson.api.listener.MapRemoveListener;
import org.redisson.api.map.RetryableMapWriterAsync;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
@ -84,6 +85,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
this.writeBehindService = null;
writeBehindTask = null;
}
if (options != null
&& options.getWriterRetryAttempts()>1
&& options.getWriterAsync() != null){
((RetryableMapWriterAsync<Object, Object>) options.getWriterAsync()).setServiceManager(commandExecutor.getServiceManager());
}
}
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
@ -108,6 +114,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
this.writeBehindService = null;
writeBehindTask = null;
}
if (options != null
&& options.getWriterRetryAttempts()>1
&& options.getWriterAsync() != null){
((RetryableMapWriterAsync<Object, Object>) options.getWriterAsync()).setServiceManager(commandExecutor.getServiceManager());
}
}
@Override

@ -21,9 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -38,6 +36,10 @@ public class RetryableMapWriterAsync<K, V> implements MapWriterAsync<K, V> {
private ServiceManager serviceManager;
public void setServiceManager(ServiceManager serviceManager) {
this.serviceManager = serviceManager;
}
public RetryableMapWriterAsync(MapOptions<K, V> options, MapWriterAsync<K, V> mapWriterAsync) {
this.options = options;
this.mapWriterAsync = mapWriterAsync;
@ -45,57 +47,67 @@ public class RetryableMapWriterAsync<K, V> implements MapWriterAsync<K, V> {
@Override
public CompletionStage<Void> write(Map<K, V> addedMap) {
return retryWrite(Math.max(1, options.getWriterRetryAttempts()), new LinkedHashMap<>(addedMap));
CompletableFuture<Void> result = new CompletableFuture<>();
retryWrite(Math.max(1, options.getWriterRetryAttempts()), addedMap, result);
return result;
}
private CompletableFuture<Void> retryWrite(int leftAttempts, Map<K, V> addedMap) {
return CompletableFuture.runAsync(() -> {
try {
//do write
mapWriterAsync.write(addedMap).toCompletableFuture().join();
} catch (Exception exception) {
if (leftAttempts - 1 == 0) {
throw exception;
} else {
//only need serviceManager when exception happened
Objects.requireNonNull(serviceManager);
log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), exception);
serviceManager.newTimeout(t -> retryWrite(leftAttempts - 1, addedMap).toCompletableFuture().join(),
private void retryWrite(int leftAttempts, Map<K, V> addedMap, CompletableFuture<Void> result) {
mapWriterAsync.write(addedMap).whenComplete((x, e) -> {
if (e == null) {
result.complete(null);
return;
}
if (leftAttempts - 1 <= 0) {
result.completeExceptionally(e);
return;
}
if (serviceManager == null) {
log.warn("The serviceManager is null so cannot retry writing keys: {}", addedMap);
result.completeExceptionally(e);
return;
}
log.warn("Unable to add keys: {}, will retry after {}ms", addedMap, options.getWriterRetryInterval(), e);
serviceManager.newTimeout(t -> retryWrite(leftAttempts - 1, addedMap, result),
options.getWriterRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
);
}
@Override
public CompletionStage<Void> delete(Collection<K> keys) {
return CompletableFuture.runAsync(() -> {
//execute at least once
int leftDeleteAttempts = Math.max(1, options.getWriterRetryAttempts());
while (leftDeleteAttempts > 0) {
try {
//do delete
mapWriterAsync.delete(keys).toCompletableFuture().join();
break;
} catch (Exception exception) {
if (--leftDeleteAttempts == 0) {
throw exception;
} else {
log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), exception);
try {
Thread.sleep(options.getWriterRetryInterval());
} catch (InterruptedException ignore) {
}
CompletableFuture<Void> result = new CompletableFuture<>();
retryDelete(Math.max(1, options.getWriterRetryAttempts()), keys, result);
return result;
}
private void retryDelete(int leftAttempts, Collection<K> keys, CompletableFuture<Void> result) {
mapWriterAsync.delete(keys).whenComplete((x, e) -> {
if (e == null) {
result.complete(null);
return;
}
if (leftAttempts - 1 <= 0) {
result.completeExceptionally(e);
return;
}
if (serviceManager == null) {
log.warn("The serviceManager is null so cannot retry deleting keys: {}", keys);
result.completeExceptionally(e);
return;
}
log.warn("Unable to delete keys: {}, will retry after {}ms", keys, options.getWriterRetryInterval(), e);
serviceManager.newTimeout(t -> retryDelete(leftAttempts - 1, keys, result),
options.getWriterRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
);
}
public RetryableMapWriterAsync<K, V> withRetryManager(ServiceManager serviceManager) {
if (this.serviceManager == null) {
this.serviceManager = serviceManager;
}
return this;
}
}

@ -1477,7 +1477,7 @@ public abstract class BaseMapTest extends BaseTest {
final RMap<String, String> map = redisson.getMap("test", options);
//do add
map.put("1", "11");
Thread.sleep(1400);
Thread.sleep(2400);
//assert add
Map<String, String> expectedMap = new HashMap<>();
@ -1490,7 +1490,7 @@ public abstract class BaseMapTest extends BaseTest {
//do delete
actualRetryTimes.set(0);
map.remove("1");
Thread.sleep(1400);
Thread.sleep(2400);
//assert delete
expectedMap.clear();

Loading…
Cancel
Save