refactoring

pull/2450/head
Nikita Koksharov 5 years ago
parent 6635071eb5
commit 0c7ea7f46b

@ -145,7 +145,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
try { try {
future.await(); future.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
((RPromise)future).tryFailure(e); ((RPromise) future).tryFailure(e);
throw e; throw e;
} }

@ -15,17 +15,12 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress; import io.netty.channel.ChannelFuture;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -37,17 +32,16 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.CountableListener; import org.redisson.misc.*;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PubSubConnectionEntry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture; import java.net.InetSocketAddress;
import io.netty.channel.ChannelFutureListener; import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
@ -238,33 +232,21 @@ public class MasterSlaveEntry {
log.error("Can't resubscribe blocking queue " + commandData, e); log.error("Can't resubscribe blocking queue " + commandData, e);
return; return;
} }
AtomicBoolean skip = new AtomicBoolean();
BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object t, Throwable u) {
if (skip.get()) {
return;
}
releaseWrite(newConnection);
}
};
commandData.getPromise().onComplete(listener);
if (commandData.getPromise().isDone()) { if (commandData.getPromise().isDone()) {
releaseWrite(newConnection);
return; return;
} }
ChannelFuture channelFuture = newConnection.send(commandData); ChannelFuture channelFuture = newConnection.send(commandData);
channelFuture.addListener(new ChannelFutureListener() { channelFuture.addListener(future -> {
@Override if (!future.isSuccess()) {
public void operationComplete(ChannelFuture future) throws Exception { commandData.getPromise().tryFailure(new RedisException("Can't resubscribe blocking queue " + commandData + " to " + newConnection));
if (!future.isSuccess()) {
listener.accept(null, null);
skip.set(true);
releaseWrite(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData);
}
} }
}); });
commandData.getPromise().onComplete((r, ex) -> {
releaseWrite(newConnection);
});
}); });
} }

@ -542,7 +542,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName());
assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L); assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L);
assertThat(value).isEqualTo(3); assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6); assertThat(queue2).containsExactly(3, 4, 5, 6);
} }

@ -1340,12 +1340,6 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
assertTrue(ConcurrentHashMap.class.isAssignableFrom(ts.getContent().getClass())); assertTrue(ConcurrentHashMap.class.isAssignableFrom(ts.getContent().getClass()));
assertFalse(RMap.class.isAssignableFrom(ts.getContent().getClass())); assertFalse(RMap.class.isAssignableFrom(ts.getContent().getClass()));
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(10);
abq.add("111");
ts.setContent(abq);
assertTrue(ArrayBlockingQueue.class.isAssignableFrom(ts.getContent().getClass()));
assertFalse(RBlockingQueue.class.isAssignableFrom(ts.getContent().getClass()));
ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
ts.setContent(clq); ts.setContent(clq);
assertTrue(ConcurrentLinkedQueue.class.isAssignableFrom(ts.getContent().getClass())); assertTrue(ConcurrentLinkedQueue.class.isAssignableFrom(ts.getContent().getClass()));

Loading…
Cancel
Save