diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index b12560106..6340a06c5 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -15,18 +15,17 @@ */ package org.redisson.client; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; import org.redisson.client.handler.CommandsQueue; -import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.CommandsData; -import org.redisson.client.protocol.QueueCommand; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.handler.CommandsQueuePubSub; +import org.redisson.client.protocol.*; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -58,6 +57,9 @@ public class RedisConnection implements RedisCommands { private Runnable connectedListener; private Runnable disconnectedListener; + private volatile boolean pooled; + private AtomicInteger usage = new AtomicInteger(); + public RedisConnection(RedisClient redisClient, Channel channel, RPromise connectionPromise) { this.redisClient = redisClient; this.connectionPromise = connectionPromise; @@ -76,6 +78,26 @@ public class RedisConnection implements RedisCommands { } } + public int incUsage() { + return usage.incrementAndGet(); + } + + public int getUsage() { + return usage.get(); + } + + public int decUsage() { + return usage.decrementAndGet(); + } + + public boolean isPooled() { + return pooled; + } + + public void setPooled(boolean pooled) { + this.pooled = pooled; + } + public boolean isQueued() { return queued; } @@ -107,7 +129,17 @@ public class RedisConnection implements RedisCommands { } public CommandData getCurrentCommand() { - QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get(); + Queue queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get(); + if (queue != null) { + QueueCommandHolder holder = queue.peek(); + if (holder != null) { + if (holder.getCommand() instanceof CommandData) { + return (CommandData) holder.getCommand(); + } + } + } + + QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); if (command instanceof CommandData) { return (CommandData) command; } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index fe30115fe..5213d2606 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -47,10 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; +import java.util.*; /** * Redis protocol command decoder @@ -72,9 +69,18 @@ public class CommandDecoder extends ReplayingDecoder { this.scheme = scheme; } + protected QueueCommand getCommand(ChannelHandlerContext ctx) { + Queue queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get(); + QueueCommandHolder holder = queue.peek(); + if (holder != null) { + return holder.getCommand(); + } + return null; + } + @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); + QueueCommand data = getCommand(ctx); if (state() == null) { state(new State()); @@ -206,10 +212,8 @@ public class CommandDecoder extends ReplayingDecoder { } protected void sendNext(Channel channel) { - CommandsQueue handler = channel.pipeline().get(CommandsQueue.class); - if (handler != null) { - handler.sendNextCommand(channel); - } + Queue queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get(); + queue.poll(); state(null); } @@ -220,6 +224,10 @@ public class CommandDecoder extends ReplayingDecoder { while (in.writerIndex() > in.readerIndex()) { CommandData commandData = null; + if (commandBatch.getCommands().size() == i) { + break; + } + checkpoint(); state().setBatchIndex(i); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java index 6875a24a7..19c003805 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -31,12 +31,6 @@ */ package org.redisson.client.handler; -import org.redisson.client.ChannelName; -import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.RedisCommands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; @@ -46,6 +40,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.CharsetUtil; +import org.redisson.client.ChannelName; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.RedisCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Redis protocol command encoder diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index aa2c5b59a..091ef5ba4 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -17,16 +17,14 @@ package org.redisson.client.handler; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import org.redisson.client.ChannelName; import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.QueueCommand; -import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.*; import org.redisson.client.protocol.decoder.ListObjectDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.pubsub.Message; @@ -64,6 +62,20 @@ public class CommandPubSubDecoder extends CommandDecoder { commands.put(new PubSubKey(channel, operation), data); } + @Override + protected QueueCommand getCommand(ChannelHandlerContext ctx) { + return ctx.channel().attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); + } + + @Override + protected void sendNext(Channel channel) { + CommandsQueuePubSub handler = channel.pipeline().get(CommandsQueuePubSub.class); + if (handler != null) { + handler.sendNextCommand(channel); + } + state(null); + } + @Override protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { if (data == null) { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index e49397bc8..ab96ce708 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -15,18 +15,19 @@ */ package org.redisson.client.handler; -import io.netty.channel.*; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.AttributeKey; -import org.redisson.client.ChannelName; import org.redisson.client.WriteRedisConnectionException; -import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.QueueCommand; -import org.redisson.client.protocol.QueueCommandHolder; +import org.redisson.client.protocol.*; import org.redisson.misc.LogHelper; -import java.util.List; +import java.net.SocketAddress; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -36,82 +37,61 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class CommandsQueue extends ChannelDuplexHandler { - public static final AttributeKey CURRENT_COMMAND = AttributeKey.valueOf("promise"); + public static final AttributeKey> COMMANDS_QUEUE = AttributeKey.valueOf("COMMANDS_QUEUE"); - private final Queue queue = new ConcurrentLinkedQueue<>(); - - private final ChannelFutureListener listener = future -> { - if (!future.isSuccess() && future.channel().isActive()) { - sendNextCommand(future.channel()); - } - }; + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { + super.connect(ctx, remoteAddress, localAddress, promise); - public void sendNextCommand(Channel channel) { - QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null); - if (command != null) { - queue.poll(); - } else { - QueueCommandHolder c = queue.peek(); - if (c != null) { - QueueCommand data = c.getCommand(); - List> pubSubOps = data.getPubSubOperations(); - if (!pubSubOps.isEmpty()) { - queue.poll(); - } - } - } - sendData(channel); + ctx.channel().attr(COMMANDS_QUEUE).set(new ConcurrentLinkedQueue<>()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - while (true) { - QueueCommandHolder command = queue.poll(); - if (command == null) { - break; + Queue queue = ctx.channel().attr(COMMANDS_QUEUE).get(); + Iterator iterator = queue.iterator(); + while (iterator.hasNext()) { + QueueCommandHolder command = iterator.next(); + + CommandData cc = (CommandData) command.getCommand(); + RedisCommand cmd = cc.getCommand(); + if (RedisCommands.BLOCKING_COMMAND_NAMES.contains(cmd.getName()) + || RedisCommands.BLOCKING_COMMANDS.contains(cmd)) { + continue; } - + + iterator.remove(); command.getChannelPromise().tryFailure( - new WriteRedisConnectionException("Channel has been closed! Can't write command: " + new WriteRedisConnectionException("Channel has been closed! Can't write command: " + LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel())); } - + super.channelInactive(ctx); } - + + private final AtomicBoolean lock = new AtomicBoolean(); + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof QueueCommand) { QueueCommand data = (QueueCommand) msg; - QueueCommandHolder holder = queue.peek(); - if (holder != null && holder.getCommand() == data) { - super.write(ctx, msg, promise); - } else { - queue.add(new QueueCommandHolder(data, promise)); - sendData(ctx.channel()); - } - } else { - super.write(ctx, msg, promise); - } - } + QueueCommandHolder holder = new QueueCommandHolder(data, promise); + + Queue queue = ctx.channel().attr(COMMANDS_QUEUE).get(); - private void sendData(Channel ch) { - QueueCommandHolder command = queue.peek(); - if (command != null && command.trySend()) { - QueueCommand data = command.getCommand(); - List> pubSubOps = data.getPubSubOperations(); - if (!pubSubOps.isEmpty()) { - for (CommandData cd : pubSubOps) { - for (Object channel : cd.getParams()) { - ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd); + while (true) { + if (lock.compareAndSet(false, true)) { + try { + queue.add(holder); + ctx.writeAndFlush(data, holder.getChannelPromise()); + } finally { + lock.set(false); } + break; } - } else { - ch.attr(CURRENT_COMMAND).set(data); } - - command.getChannelPromise().addListener(listener); - ch.writeAndFlush(data, command.getChannelPromise()); + } else { + super.write(ctx, msg, promise); } } diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java index c4122e6e0..9da742ee4 100644 --- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java +++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java @@ -87,8 +87,13 @@ public class RedisChannelInitializer extends ChannelInitializer { ch.pipeline().addLast( connectionWatchdog, CommandEncoder.INSTANCE, - CommandBatchEncoder.INSTANCE, - new CommandsQueue()); + CommandBatchEncoder.INSTANCE); + + if (type == Type.PLAIN) { + ch.pipeline().addLast(new CommandsQueue()); + } else { + ch.pipeline().addLast(new CommandsQueuePubSub()); + } if (pingConnectionHandler != null) { ch.pipeline().addLast(pingConnectionHandler); diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index bfd47c098..db6488499 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -20,13 +20,19 @@ import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.config.ReadMode; import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Deque; +import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -43,8 +49,9 @@ public class ClientConnectionsEntry { private final AsyncSemaphore freeSubscribeConnectionsCounter; private final Queue allConnections = new ConcurrentLinkedQueue<>(); - private final Queue freeConnections = new ConcurrentLinkedQueue<>(); + private final Deque freeConnections = new ConcurrentLinkedDeque<>(); private final AsyncSemaphore freeConnectionsCounter; + private Iterator iter; public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} @@ -76,6 +83,8 @@ public class ClientConnectionsEntry { freeConnections.remove(c); return allConnections.remove(c); }); + + iter = freeConnections.iterator(); } public boolean isMasterForRead() { @@ -148,8 +157,14 @@ public class ClientConnectionsEntry { return freeConnectionsCounter.getCounter(); } - public void acquireConnection(Runnable runnable) { - freeConnectionsCounter.acquire(runnable); + public void acquireConnection(Runnable runnable, RedisCommand command) { + if (command == null || RedisCommands.BLOCKING_COMMAND_NAMES.contains(command.getName()) + || RedisCommands.BLOCKING_COMMANDS.contains(command)) { + freeConnectionsCounter.acquire(runnable); + return; + } + + runnable.run(); } public void removeConnection(Runnable runnable) { @@ -160,8 +175,44 @@ public class ClientConnectionsEntry { freeConnectionsCounter.release(); } - public RedisConnection pollConnection() { - return freeConnections.poll(); + AtomicBoolean lock = new AtomicBoolean(); + + public RedisConnection pollConnection(RedisCommand command) { + if (command == null + || RedisCommands.BLOCKING_COMMAND_NAMES.contains(command.getName()) + || RedisCommands.BLOCKING_COMMANDS.contains(command)) { + while (true) { + if (lock.compareAndSet(false, true)) { + RedisConnection c = freeConnections.poll(); + lock.set(false); + if (c != null) { + c.incUsage(); + c.setPooled(true); + } + return c; + } + } + } + + while (true) { + if (lock.compareAndSet(false, true)) { + if (!iter.hasNext()) { + iter = freeConnections.iterator(); + } + try { + if (iter.hasNext()) { + RedisConnection c = iter.next(); + if (c != null) { + c.incUsage(); + } + return c; + } + return null; + } finally { + lock.set(false); + } + } + } } public void releaseConnection(RedisConnection connection) { @@ -175,7 +226,15 @@ public class ClientConnectionsEntry { } connection.setLastUsageTime(System.nanoTime()); - freeConnections.add(connection); + if (connection.getUsage() == 0) { + freeConnections.add(connection); + return; + } + connection.decUsage(); + if (connection.isPooled() && connection.getUsage() == 0) { + freeConnections.add(connection); + connection.setPooled(false); + } } public RFuture connect() { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index dab70cf6a..d30a2f35a 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -203,7 +203,7 @@ public class MasterSlaveEntry { reattachBlockingQueue(connection.getCurrentCommand()); } while (true) { - RedisConnection connection = entry.pollConnection(); + RedisConnection connection = entry.pollConnection(null); if (connection == null) { break; } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 4243c4895..c95683981 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -172,12 +172,11 @@ abstract class ConnectionPool { } }); } - }); - + }, null); } - protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { - entry.acquireConnection(runnable); + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand command) { + entry.acquireConnection(runnable, command); } protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); @@ -235,7 +234,7 @@ abstract class ConnectionPool { @Override public void run() { executed = true; - connectTo(entry, result); + connectTo(entry, result, command); } @Override @@ -248,7 +247,7 @@ abstract class ConnectionPool { }; result.onComplete(callback); - acquireConnection(entry, callback); + acquireConnection(entry, callback, command); return result; } @@ -261,20 +260,20 @@ abstract class ConnectionPool { return true; } - protected T poll(ClientConnectionsEntry entry) { - return (T) entry.pollConnection(); + protected T poll(ClientConnectionsEntry entry, RedisCommand command) { + return (T) entry.pollConnection(command); } protected RFuture connect(ClientConnectionsEntry entry) { return (RFuture) entry.connect(); } - private void connectTo(ClientConnectionsEntry entry, RPromise promise) { + private void connectTo(ClientConnectionsEntry entry, RPromise promise, RedisCommand command) { if (promise.isDone()) { releaseConnection(entry); return; } - T conn = poll(entry); + T conn = poll(entry, command); if (conn != null) { if (!conn.isActive() && entry.getNodeType() == NodeType.SLAVE) { entry.trySetupFistFail(); diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 590d4c10e..45c63894c 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -17,6 +17,7 @@ package org.redisson.connection.pool; import org.redisson.api.RFuture; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; @@ -40,7 +41,7 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected RedisPubSubConnection poll(ClientConnectionsEntry entry) { + protected RedisPubSubConnection poll(ClientConnectionsEntry entry, RedisCommand command) { return entry.pollSubscribeConnection(); } @@ -55,7 +56,7 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand command) { entry.acquireSubscribeConnection(runnable); } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index cf5bc3497..1b1cffdf7 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -66,8 +66,49 @@ public class RedissonTest extends BaseTest { } } } - + } + + @Test + public void testPerformance() throws InterruptedException { + Config config = createConfig(); + config.useSingleServer().setConnectionPoolSize(1).setConnectionMinimumIdleSize(1); + RedissonClient inst = Redisson.create(config); + RAtomicLong s = inst.getAtomicLong("counter"); + + ExecutorService ex = Executors.newFixedThreadPool(16); + for (int i = 0; i < 500_000; i++) { + ex.execute(() -> { + long t = s.incrementAndGet(); + }); + } + + ex.shutdown(); + assertThat(ex.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); + inst.shutdown(); + } + + @Test + public void testResponseHandling() throws InterruptedException { + List list = new ArrayList<>(); + for (int i = 0; i < 10000; i++) { + list.add(i); + } + RList l = redisson.getList("test"); + l.addAll(list); + ExecutorService e = Executors.newFixedThreadPool(8); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 100; i++) { + e.submit(() -> { + for (int k = 0; k < 10000; k++) { + assertThat(l.get(k)).isEqualTo(k); + counter.incrementAndGet(); + } + }); + } + e.shutdown(); + assertThat(e.awaitTermination(30, TimeUnit.SECONDS)).isTrue(); + assertThat(counter.get()).isEqualTo(10000 * 100); } @Test