refactoring

pull/4113/head
Nikita Koksharov 3 years ago
parent c18a508479
commit 557b519139

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.springframework.data.redis.connection.DefaultMessage;
@ -69,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -99,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -67,7 +67,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}
@ -97,7 +97,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (CompletableFuture<?> future : list) {
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
}

@ -59,15 +59,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
CompletableFuture<RedissonCountDownLatchEntry> future = subscribe();
RedissonCountDownLatchEntry entry = commandExecutor.getInterrupted(future);
try {
commandExecutor.syncSubscriptionInterrupted(future);
while (getCount() > 0) {
// waiting for open state
commandExecutor.getNow(future).getLatch().await();
entry.getLatch().await();
}
} finally {
unsubscribe(commandExecutor.getNow(future));
unsubscribe(entry);
}
}

@ -100,10 +100,11 @@ public class RedissonLock extends RedissonBaseLock {
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
entry = commandExecutor.getInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
entry = commandExecutor.get(future);
}
try {
@ -117,23 +118,23 @@ public class RedissonLock extends RedissonBaseLock {
// waiting for message
if (ttl >= 0) {
try {
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
commandExecutor.getNow(future).getLatch().acquire();
entry.getLatch().acquire();
} else {
commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(commandExecutor.getNow(future), threadId);
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}

@ -75,7 +75,7 @@ public class RedissonPatternTopic implements RPatternTopic {
private int addListener(RedisPubSubListener<?> pubSubListener) {
CompletableFuture<Collection<PubSubConnectionEntry>> future = subscribeService.psubscribe(channelName, codec, pubSubListener);
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
return System.identityHashCode(pubSubListener);
}
@ -115,7 +115,7 @@ public class RedissonPatternTopic implements RPatternTopic {
@Override
public void removeListener(int listenerId) {
commandExecutor.syncSubscription(removeListenerAsync(listenerId).toCompletableFuture());
commandExecutor.get(removeListenerAsync(listenerId).toCompletableFuture());
}
@Override
@ -138,7 +138,7 @@ public class RedissonPatternTopic implements RPatternTopic {
@Override
public void removeListener(PatternMessageListener<?> listener) {
CompletableFuture<Void> future = subscribeService.removeListenerAsync(PubSubType.PUNSUBSCRIBE, channelName, listener);
commandExecutor.syncSubscription(future);
commandExecutor.get(future);
}
@Override

@ -87,7 +87,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
CompletableFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscriptionInterrupted(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
try {
while (true) {
Long nearestTimeout;
@ -103,13 +103,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
if (nearestTimeout != null) {
commandExecutor.getNow(future).getLatch().tryAcquire(permits, nearestTimeout, TimeUnit.MILLISECONDS);
entry.getLatch().tryAcquire(permits, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(future).getLatch().acquire(permits);
entry.getLatch().acquire(permits);
}
}
} finally {
unsubscribe(commandExecutor.getNow(future));
unsubscribe(entry);
}
// return get(acquireAsync(permits, ttl, timeUnit));
}

@ -80,17 +80,17 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
CompletableFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscriptionInterrupted(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
commandExecutor.getNow(future).getLatch().acquire();
entry.getLatch().acquire();
}
} finally {
unsubscribe(commandExecutor.getNow(future));
unsubscribe(entry);
}
// get(acquireAsync(permits));
}

@ -104,15 +104,13 @@ public class RedissonTopic implements RTopic {
@Override
public int addListener(StatusListener listener) {
RFuture<Integer> future = addListenerAsync(listener);
commandExecutor.syncSubscription(future.toCompletableFuture());
return commandExecutor.getNow(future.toCompletableFuture());
return commandExecutor.get(future.toCompletableFuture());
};
@Override
public <M> int addListener(Class<M> type, MessageListener<? extends M> listener) {
RFuture<Integer> future = addListenerAsync(type, (MessageListener<M>) listener);
commandExecutor.syncSubscription(future.toCompletableFuture());
return commandExecutor.getNow(future.toCompletableFuture());
return commandExecutor.get(future.toCompletableFuture());
}
@Override
@ -163,7 +161,7 @@ public class RedissonTopic implements RTopic {
@Override
public void removeListener(MessageListener<?> listener) {
RFuture<Void> future = removeListenerAsync(listener);
commandExecutor.syncSubscription(future.toCompletableFuture());
commandExecutor.get(future.toCompletableFuture());
}
@Override
@ -180,7 +178,7 @@ public class RedissonTopic implements RTopic {
@Override
public void removeListener(Integer... listenerIds) {
commandExecutor.syncSubscription(removeListenerAsync(listenerIds).toCompletableFuture());
commandExecutor.get(removeListenerAsync(listenerIds).toCompletableFuture());
}
@Override

@ -45,10 +45,6 @@ public interface CommandAsyncExecutor {
RedisException convertException(ExecutionException e);
void syncSubscription(CompletableFuture<?> future);
void syncSubscriptionInterrupted(CompletableFuture<?> future) throws InterruptedException;
<V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2);
<V> V getNow(CompletableFuture<V> future);

@ -82,16 +82,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return objectBuilder != null;
}
@Override
public void syncSubscription(CompletableFuture<?> future) {
get(future);
}
@Override
public void syncSubscriptionInterrupted(CompletableFuture<?> future) throws InterruptedException {
getInterrupted(future);
}
@Override
public <V> V getNow(CompletableFuture<V> future) {
try {

Loading…
Cancel
Save