refactoring

pull/5038/head
Nikita Koksharov 2 years ago
parent 2bac289bc2
commit 224cb1a487

@ -54,7 +54,7 @@ public class CommandBatchService extends CommandAsyncService {
public static class ConnectionEntry { public static class ConnectionEntry {
boolean firstCommand = true; boolean firstCommand = true;
CompletableFuture<RedisConnection> connectionFuture; volatile CompletableFuture<RedisConnection> connectionFuture;
Runnable cancelCallback; Runnable cancelCallback;
public CompletableFuture<RedisConnection> getConnectionFuture() { public CompletableFuture<RedisConnection> getConnectionFuture() {

@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/** /**
* *
@ -198,37 +199,31 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
} }
} }
} }
private static final AtomicReferenceFieldUpdater<ConnectionEntry, CompletableFuture> CONNECTION_LATCH =
AtomicReferenceFieldUpdater.newUpdater(ConnectionEntry.class,
CompletableFuture.class, "connectionFuture");
@Override @Override
protected CompletableFuture<RedisConnection> getConnection() { protected CompletableFuture<RedisConnection> getConnection() {
MasterSlaveEntry msEntry = getEntry(); MasterSlaveEntry msEntry = getEntry();
ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> new ConnectionEntry()); ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> new ConnectionEntry());
if (entry.getConnectionFuture() != null) { if (CONNECTION_LATCH.compareAndSet(entry, null, new CompletableFuture<>())) {
connectionFuture = entry.getConnectionFuture(); connectionFuture = entry.getConnectionFuture();
return connectionFuture; CompletableFuture<RedisConnection> cf;
}
synchronized (this) {
if (entry.getConnectionFuture() != null) {
connectionFuture = entry.getConnectionFuture();
return connectionFuture;
}
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
connectionFuture = connectionWriteOp(null); cf = connectionWriteOp(null);
} else { } else {
connectionFuture = connectionReadOp(null); cf = connectionReadOp(null);
} }
connectionFuture.toCompletableFuture().join(); connectionManager.getServiceManager().transfer(cf, connectionFuture);
entry.setConnectionFuture(connectionFuture);
entry.setCancelCallback(() -> { entry.setCancelCallback(() -> {
handleError(connectionFuture, new CancellationException()); handleError(connectionFuture, new CancellationException());
}); });
return connectionFuture;
} }
return entry.getConnectionFuture();
} }
private MasterSlaveEntry getEntry() { private MasterSlaveEntry getEntry() {

@ -408,4 +408,15 @@ public class ServiceManager {
}); });
} }
public <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2) {
future1.whenComplete((res, e) -> {
if (e != null) {
future2.completeExceptionally(e);
return;
}
future2.complete(res);
});
}
} }

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheException; import javax.cache.CacheException;
@ -52,17 +53,17 @@ import org.redisson.jcache.configuration.RedissonConfiguration;
public class JCacheManager implements CacheManager { public class JCacheManager implements CacheManager {
private static final EmptyStatisticsMXBean EMPTY_INSTANCE = new EmptyStatisticsMXBean(); private static final EmptyStatisticsMXBean EMPTY_INSTANCE = new EmptyStatisticsMXBean();
private static MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); private static final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
private final ClassLoader classLoader; private final ClassLoader classLoader;
private final CachingProvider cacheProvider; private final CachingProvider cacheProvider;
private final Properties properties; private final Properties properties;
private final URI uri; private final URI uri;
private final ConcurrentMap<String, JCache<?, ?>> caches = new ConcurrentHashMap<String, JCache<?, ?>>(); private final ConcurrentMap<String, JCache<?, ?>> caches = new ConcurrentHashMap<>();
private final ConcurrentMap<JCache<?, ?>, JCacheStatisticsMXBean> statBeans = new ConcurrentHashMap<JCache<?, ?>, JCacheStatisticsMXBean>(); private final ConcurrentMap<JCache<?, ?>, JCacheStatisticsMXBean> statBeans = new ConcurrentHashMap<>();
private final ConcurrentMap<JCache<?, ?>, JCacheManagementMXBean> managementBeans = new ConcurrentHashMap<JCache<?, ?>, JCacheManagementMXBean>(); private final ConcurrentMap<JCache<?, ?>, JCacheManagementMXBean> managementBeans = new ConcurrentHashMap<>();
private volatile boolean closed; private final AtomicBoolean closed = new AtomicBoolean();
private final Redisson redisson; private final Redisson redisson;
@ -96,7 +97,7 @@ public class JCacheManager implements CacheManager {
} }
private void checkNotClosed() { private void checkNotClosed() {
if (closed) { if (closed.get()) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
@ -130,7 +131,7 @@ public class JCacheManager implements CacheManager {
} }
JCacheConfiguration<K, V> cfg = new JCacheConfiguration<K, V>(configuration); JCacheConfiguration<K, V> cfg = new JCacheConfiguration<K, V>(configuration);
JCache<K, V> cache = new JCache<K, V>(this, cacheRedisson, cacheName, cfg, hasOwnRedisson); JCache<K, V> cache = new JCache<>(this, cacheRedisson, cacheName, cfg, hasOwnRedisson);
JCache<?, ?> oldCache = caches.putIfAbsent(cacheName, cache); JCache<?, ?> oldCache = caches.putIfAbsent(cacheName, cache);
if (oldCache != null) { if (oldCache != null) {
throw new CacheException("Cache " + cacheName + " already exists"); throw new CacheException("Cache " + cacheName + " already exists");
@ -188,7 +189,7 @@ public class JCacheManager implements CacheManager {
@Override @Override
public Iterable<String> getCacheNames() { public Iterable<String> getCacheNames() {
return Collections.unmodifiableSet(new HashSet<String>(caches.keySet())); return Collections.unmodifiableSet(new HashSet<>(caches.keySet()));
} }
@Override @Override
@ -350,33 +351,26 @@ public class JCacheManager implements CacheManager {
@Override @Override
public void close() { public void close() {
if (isClosed()) { if (closed.compareAndSet(false, true)) {
return; if (cacheProvider != null) {
} cacheProvider.close(uri, classLoader);
}
synchronized (this) { for (Cache<?, ?> cache : caches.values()) {
if (!isClosed()) { try {
if (cacheProvider != null) { cache.close();
cacheProvider.close(uri, classLoader); } catch (Exception e) {
} // skip
for (Cache<?, ?> cache : caches.values()) {
try {
cache.close();
} catch (Exception e) {
// skip
}
}
if (redisson != null) {
redisson.shutdown();
} }
closed = true; }
if (redisson != null) {
redisson.shutdown();
} }
} }
} }
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return closed; return closed.get();
} }
@Override @Override

Loading…
Cancel
Save