Improvement - performance gain for connection pool with few connections. #3704

pull/3742/head
Nikita Koksharov 4 years ago
parent 36491a3042
commit db65315d60

@ -15,18 +15,17 @@
*/
package org.redisson.client;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.handler.CommandsQueuePubSub;
import org.redisson.client.protocol.*;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -58,6 +57,9 @@ public class RedisConnection implements RedisCommands {
private Runnable connectedListener;
private Runnable disconnectedListener;
private volatile boolean pooled;
private AtomicInteger usage = new AtomicInteger();
public <C> RedisConnection(RedisClient redisClient, Channel channel, RPromise<C> connectionPromise) {
this.redisClient = redisClient;
this.connectionPromise = connectionPromise;
@ -76,6 +78,26 @@ public class RedisConnection implements RedisCommands {
}
}
public int incUsage() {
return usage.incrementAndGet();
}
public int getUsage() {
return usage.get();
}
public int decUsage() {
return usage.decrementAndGet();
}
public boolean isPooled() {
return pooled;
}
public void setPooled(boolean pooled) {
this.pooled = pooled;
}
public boolean isQueued() {
return queued;
}
@ -107,7 +129,17 @@ public class RedisConnection implements RedisCommands {
}
public CommandData<?, ?> getCurrentCommand() {
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get();
Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
if (queue != null) {
QueueCommandHolder holder = queue.peek();
if (holder != null) {
if (holder.getCommand() instanceof CommandData) {
return (CommandData<?, ?>) holder.getCommand();
}
}
}
QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get();
if (command instanceof CommandData) {
return (CommandData<?, ?>) command;
}

@ -47,10 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.*;
/**
* Redis protocol command decoder
@ -72,9 +69,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
this.scheme = scheme;
}
protected QueueCommand getCommand(ChannelHandlerContext ctx) {
Queue<QueueCommandHolder> queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get();
QueueCommandHolder holder = queue.peek();
if (holder != null) {
return holder.getCommand();
}
return null;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
QueueCommand data = getCommand(ctx);
if (state() == null) {
state(new State());
@ -206,10 +212,8 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
protected void sendNext(Channel channel) {
CommandsQueue handler = channel.pipeline().get(CommandsQueue.class);
if (handler != null) {
handler.sendNextCommand(channel);
}
Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
queue.poll();
state(null);
}
@ -220,6 +224,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> commandData = null;
if (commandBatch.getCommands().size() == i) {
break;
}
checkpoint();
state().setBatchIndex(i);

@ -31,12 +31,6 @@
*/
package org.redisson.client.handler;
import org.redisson.client.ChannelName;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
@ -46,6 +40,11 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import org.redisson.client.ChannelName;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Redis protocol command encoder

@ -17,16 +17,14 @@ package org.redisson.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.*;
import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
@ -64,6 +62,20 @@ public class CommandPubSubDecoder extends CommandDecoder {
commands.put(new PubSubKey(channel, operation), data);
}
@Override
protected QueueCommand getCommand(ChannelHandlerContext ctx) {
return ctx.channel().attr(CommandsQueuePubSub.CURRENT_COMMAND).get();
}
@Override
protected void sendNext(Channel channel) {
CommandsQueuePubSub handler = channel.pipeline().get(CommandsQueuePubSub.class);
if (handler != null) {
handler.sendNextCommand(channel);
}
state(null);
}
@Override
protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception {
if (data == null) {

@ -15,18 +15,19 @@
*/
package org.redisson.client.handler;
import io.netty.channel.*;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import org.redisson.client.ChannelName;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import org.redisson.client.protocol.*;
import org.redisson.misc.LogHelper;
import java.util.List;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -36,82 +37,61 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class CommandsQueue extends ChannelDuplexHandler {
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");
public static final AttributeKey<Queue<QueueCommandHolder>> COMMANDS_QUEUE = AttributeKey.valueOf("COMMANDS_QUEUE");
private final Queue<QueueCommandHolder> queue = new ConcurrentLinkedQueue<>();
private final ChannelFutureListener listener = future -> {
if (!future.isSuccess() && future.channel().isActive()) {
sendNextCommand(future.channel());
}
};
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
public void sendNextCommand(Channel channel) {
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null);
if (command != null) {
queue.poll();
} else {
QueueCommandHolder c = queue.peek();
if (c != null) {
QueueCommand data = c.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
queue.poll();
}
}
}
sendData(channel);
ctx.channel().attr(COMMANDS_QUEUE).set(new ConcurrentLinkedQueue<>());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
while (true) {
QueueCommandHolder command = queue.poll();
if (command == null) {
break;
Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();
Iterator<QueueCommandHolder> iterator = queue.iterator();
while (iterator.hasNext()) {
QueueCommandHolder command = iterator.next();
CommandData cc = (CommandData) command.getCommand();
RedisCommand cmd = cc.getCommand();
if (RedisCommands.BLOCKING_COMMAND_NAMES.contains(cmd.getName())
|| RedisCommands.BLOCKING_COMMANDS.contains(cmd)) {
continue;
}
iterator.remove();
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Channel has been closed! Can't write command: "
new WriteRedisConnectionException("Channel has been closed! Can't write command: "
+ LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel()));
}
super.channelInactive(ctx);
}
private final AtomicBoolean lock = new AtomicBoolean();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.getCommand() == data) {
super.write(ctx, msg, promise);
} else {
queue.add(new QueueCommandHolder(data, promise));
sendData(ctx.channel());
}
} else {
super.write(ctx, msg, promise);
}
}
QueueCommandHolder holder = new QueueCommandHolder(data, promise);
Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();
private void sendData(Channel ch) {
QueueCommandHolder command = queue.peek();
if (command != null && command.trySend()) {
QueueCommand data = command.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
for (Object channel : cd.getParams()) {
ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
while (true) {
if (lock.compareAndSet(false, true)) {
try {
queue.add(holder);
ctx.writeAndFlush(data, holder.getChannelPromise());
} finally {
lock.set(false);
}
break;
}
} else {
ch.attr(CURRENT_COMMAND).set(data);
}
command.getChannelPromise().addListener(listener);
ch.writeAndFlush(data, command.getChannelPromise());
} else {
super.write(ctx, msg, promise);
}
}

@ -87,8 +87,13 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
ch.pipeline().addLast(
connectionWatchdog,
CommandEncoder.INSTANCE,
CommandBatchEncoder.INSTANCE,
new CommandsQueue());
CommandBatchEncoder.INSTANCE);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandsQueue());
} else {
ch.pipeline().addLast(new CommandsQueuePubSub());
}
if (pingConnectionHandler != null) {
ch.pipeline().addLast(pingConnectionHandler);

@ -20,13 +20,19 @@ import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.ReadMode;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Deque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -43,8 +49,9 @@ public class ClientConnectionsEntry {
private final AsyncSemaphore freeSubscribeConnectionsCounter;
private final Queue<RedisConnection> allConnections = new ConcurrentLinkedQueue<>();
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<>();
private final Deque<RedisConnection> freeConnections = new ConcurrentLinkedDeque<>();
private final AsyncSemaphore freeConnectionsCounter;
private Iterator<RedisConnection> iter;
public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
@ -76,6 +83,8 @@ public class ClientConnectionsEntry {
freeConnections.remove(c);
return allConnections.remove(c);
});
iter = freeConnections.iterator();
}
public boolean isMasterForRead() {
@ -148,8 +157,14 @@ public class ClientConnectionsEntry {
return freeConnectionsCounter.getCounter();
}
public void acquireConnection(Runnable runnable) {
freeConnectionsCounter.acquire(runnable);
public void acquireConnection(Runnable runnable, RedisCommand<?> command) {
if (command == null || RedisCommands.BLOCKING_COMMAND_NAMES.contains(command.getName())
|| RedisCommands.BLOCKING_COMMANDS.contains(command)) {
freeConnectionsCounter.acquire(runnable);
return;
}
runnable.run();
}
public void removeConnection(Runnable runnable) {
@ -160,8 +175,44 @@ public class ClientConnectionsEntry {
freeConnectionsCounter.release();
}
public RedisConnection pollConnection() {
return freeConnections.poll();
AtomicBoolean lock = new AtomicBoolean();
public RedisConnection pollConnection(RedisCommand<?> command) {
if (command == null
|| RedisCommands.BLOCKING_COMMAND_NAMES.contains(command.getName())
|| RedisCommands.BLOCKING_COMMANDS.contains(command)) {
while (true) {
if (lock.compareAndSet(false, true)) {
RedisConnection c = freeConnections.poll();
lock.set(false);
if (c != null) {
c.incUsage();
c.setPooled(true);
}
return c;
}
}
}
while (true) {
if (lock.compareAndSet(false, true)) {
if (!iter.hasNext()) {
iter = freeConnections.iterator();
}
try {
if (iter.hasNext()) {
RedisConnection c = iter.next();
if (c != null) {
c.incUsage();
}
return c;
}
return null;
} finally {
lock.set(false);
}
}
}
}
public void releaseConnection(RedisConnection connection) {
@ -175,7 +226,15 @@ public class ClientConnectionsEntry {
}
connection.setLastUsageTime(System.nanoTime());
freeConnections.add(connection);
if (connection.getUsage() == 0) {
freeConnections.add(connection);
return;
}
connection.decUsage();
if (connection.isPooled() && connection.getUsage() == 0) {
freeConnections.add(connection);
connection.setPooled(false);
}
}
public RFuture<RedisConnection> connect() {

@ -203,7 +203,7 @@ public class MasterSlaveEntry {
reattachBlockingQueue(connection.getCurrentCommand());
}
while (true) {
RedisConnection connection = entry.pollConnection();
RedisConnection connection = entry.pollConnection(null);
if (connection == null) {
break;
}

@ -172,12 +172,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
});
}
});
}, null);
}
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireConnection(runnable);
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand<?> command) {
entry.acquireConnection(runnable, command);
}
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
@ -235,7 +234,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
@Override
public void run() {
executed = true;
connectTo(entry, result);
connectTo(entry, result, command);
}
@Override
@ -248,7 +247,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
};
result.onComplete(callback);
acquireConnection(entry, callback);
acquireConnection(entry, callback, command);
return result;
}
@ -261,20 +260,20 @@ abstract class ConnectionPool<T extends RedisConnection> {
return true;
}
protected T poll(ClientConnectionsEntry entry) {
return (T) entry.pollConnection();
protected T poll(ClientConnectionsEntry entry, RedisCommand<?> command) {
return (T) entry.pollConnection(command);
}
protected RFuture<T> connect(ClientConnectionsEntry entry) {
return (RFuture<T>) entry.connect();
}
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise, RedisCommand<?> command) {
if (promise.isDone()) {
releaseConnection(entry);
return;
}
T conn = poll(entry);
T conn = poll(entry, command);
if (conn != null) {
if (!conn.isActive() && entry.getNodeType() == NodeType.SLAVE) {
entry.trySetupFistFail();

@ -17,6 +17,7 @@ package org.redisson.connection.pool;
import org.redisson.api.RFuture;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
@ -40,7 +41,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected RedisPubSubConnection poll(ClientConnectionsEntry entry) {
protected RedisPubSubConnection poll(ClientConnectionsEntry entry, RedisCommand<?> command) {
return entry.pollSubscribeConnection();
}
@ -55,7 +56,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand<?> command) {
entry.acquireSubscribeConnection(runnable);
}

@ -66,8 +66,49 @@ public class RedissonTest extends BaseTest {
}
}
}
}
@Test
public void testPerformance() throws InterruptedException {
Config config = createConfig();
config.useSingleServer().setConnectionPoolSize(1).setConnectionMinimumIdleSize(1);
RedissonClient inst = Redisson.create(config);
RAtomicLong s = inst.getAtomicLong("counter");
ExecutorService ex = Executors.newFixedThreadPool(16);
for (int i = 0; i < 500_000; i++) {
ex.execute(() -> {
long t = s.incrementAndGet();
});
}
ex.shutdown();
assertThat(ex.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
inst.shutdown();
}
@Test
public void testResponseHandling() throws InterruptedException {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
RList<Integer> l = redisson.getList("test");
l.addAll(list);
ExecutorService e = Executors.newFixedThreadPool(8);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 100; i++) {
e.submit(() -> {
for (int k = 0; k < 10000; k++) {
assertThat(l.get(k)).isEqualTo(k);
counter.incrementAndGet();
}
});
}
e.shutdown();
assertThat(e.awaitTermination(30, TimeUnit.SECONDS)).isTrue();
assertThat(counter.get()).isEqualTo(10000 * 100);
}
@Test

Loading…
Cancel
Save