From e923b269a2fdb352c7baae195726295e20a97b9b Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Jun 2015 16:11:38 +0300 Subject: [PATCH] unix sockets support. #173 --- pom.xml | 5 ++++ .../com/lambdaworks/redis/RedisClient.java | 8 ++++++- .../redis/protocol/CommandHandler.java | 4 +++- src/main/java/org/redisson/Config.java | 19 ++++++++++++--- .../connection/ClusterConnectionManager.java | 6 ++--- .../connection/ConnectionManager.java | 9 +++++--- .../MasterSlaveConnectionManager.java | 23 ++++++++++++++++--- .../redisson/connection/MasterSlaveEntry.java | 14 +++++------ .../connection/SentinelConnectionManager.java | 4 ++-- .../connection/SingleConnectionManager.java | 2 +- .../org/redisson/connection/SingleEntry.java | 8 +++---- 11 files changed, 72 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 73dc4fb32..99ce08723 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,11 @@ + + io.netty + netty-transport-native-epoll + 4.0.27.Final + io.netty netty-buffer diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index 58bc359bd..911c543d6 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -12,6 +12,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; @@ -61,9 +62,14 @@ public class RedisClient { * @param port Server port. */ public RedisClient(EventLoopGroup group, String host, int port, int timeout) { + this(group, NioSocketChannel.class, host, port, timeout); + } + + public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, + int port, int timeout2) { addr = new InetSocketAddress(host, port); - bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr); + bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); setDefaultTimeout(timeout, TimeUnit.MILLISECONDS); diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index d16a87f88..be7fff5d5 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -46,6 +46,8 @@ public class CommandHandler extends ChannelDuplexHandler { try { if (!input.isReadable()) return; + System.out.println("in: " + input.toString(CharsetUtil.UTF_8)); + buffer.discardReadBytes(); buffer.writeBytes(input); @@ -60,7 +62,7 @@ public class CommandHandler extends ChannelDuplexHandler { Command cmd = (Command) msg; ByteBuf buf = ctx.alloc().heapBuffer(); cmd.encode(buf); -// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); + System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); ctx.write(buf, promise); } diff --git a/src/main/java/org/redisson/Config.java b/src/main/java/org/redisson/Config.java index 30ecf6a43..e64804c77 100644 --- a/src/main/java/org/redisson/Config.java +++ b/src/main/java/org/redisson/Config.java @@ -31,7 +31,7 @@ public class Config { private MasterSlaveServersConfig masterSlaveServersConfig; private SingleServerConfig singleServerConfig; - + private ClusterServersConfig clusterServersConfig; /** @@ -44,10 +44,14 @@ public class Config { */ private RedissonCodec codec; + private boolean useLinuxNativeEpoll; + public Config() { } Config(Config oldConf) { + setUseLinuxNativeEpoll(oldConf.isUseLinuxNativeEpoll()); + if (oldConf.getCodec() == null) { // use it by default oldConf.setCodec(new JsonJacksonCodec()); @@ -100,7 +104,7 @@ public class Config { void setClusterServersConfig(ClusterServersConfig clusterServersConfig) { this.clusterServersConfig = clusterServersConfig; } - + public SingleServerConfig useSingleServer() { checkClusterServersConfig(); checkMasterSlaveServersConfig(); @@ -168,7 +172,7 @@ public class Config { throw new IllegalStateException("cluster servers config already used!"); } } - + private void checkSentinelServersConfig() { if (sentinelServersConfig != null) { throw new IllegalStateException("sentinel servers config already used!"); @@ -187,4 +191,13 @@ public class Config { } } + public boolean isUseLinuxNativeEpoll() { + return useLinuxNativeEpoll; + } + + public Config setUseLinuxNativeEpoll(boolean useLinuxNativeEpoll) { + this.useLinuxNativeEpoll = useLinuxNativeEpoll; + return this; + } + } diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 6d4b52a90..7b5471ffb 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -54,7 +54,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { init(config); for (URI addr : cfg.getNodeAddresses()) { - RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort()); try { RedisAsyncConnection connection = client.connectAsync(); String nodesValue = get(connection.clusterNodes()); @@ -87,7 +87,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); c.setMasterAddress(partition.getMasterAddress()); - SingleEntry entry = new SingleEntry(codec, group, c); + SingleEntry entry = new SingleEntry(codec, this, c); entries.put(partition.getEndSlot(), entry); lastPartitions.put(partition.getEndSlot(), partition); } @@ -98,7 +98,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void run() { try { for (URI addr : cfg.getNodeAddresses()) { - final RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout()); + final RedisClient client = createClient(addr.getHost(), addr.getPort()); try { RedisAsyncConnection connection = client.connectAsync(); String nodesValue = get(connection.clusterNodes()); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index e1eb2eca1..cf3f99de4 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -22,6 +22,7 @@ import org.redisson.async.AsyncOperation; import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; +import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; @@ -33,16 +34,18 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; //TODO ping support public interface ConnectionManager { + RedisClient createClient(String host, int port); + V get(Future future); R read(String key, SyncOperation operation); R read(SyncOperation operation); - + R write(String key, SyncInterruptedOperation operation) throws InterruptedException; - + R write(SyncInterruptedOperation operation) throws InterruptedException; - + R write(String key, SyncOperation operation); R write(SyncOperation operation); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 856e5ff87..9e2845370 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -17,6 +17,10 @@ package org.redisson.connection; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -46,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lambdaworks.redis.RedisAsyncConnection; +import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.RedisConnectionException; import com.lambdaworks.redis.RedisException; @@ -71,6 +76,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected EventLoopGroup group; + protected Class socketChannelClass; + protected final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); protected MasterSlaveServersConfig config; @@ -92,15 +99,25 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void init(MasterSlaveServersConfig config) { this.config = config; - MasterSlaveEntry entry = new MasterSlaveEntry(codec, group, config); + MasterSlaveEntry entry = new MasterSlaveEntry(codec, this, config); entries.put(Integer.MAX_VALUE, entry); } protected void init(Config cfg) { - this.group = new NioEventLoopGroup(cfg.getThreads()); + if (cfg.isUseLinuxNativeEpoll()) { + this.group = new EpollEventLoopGroup(cfg.getThreads()); + this.socketChannelClass = EpollSocketChannel.class; + } else { + this.group = new NioEventLoopGroup(cfg.getThreads()); + this.socketChannelClass = NioSocketChannel.class; + } this.codec = new RedisCodecWrapper(cfg.getCodec()); } + public RedisClient createClient(String host, int port) { + return new RedisClient(group, socketChannelClass, host, port, config.getTimeout()); + } + public FutureListener createReleaseWriteListener(final int slot, final RedisConnection conn, final Timeout timeout) { return new FutureListener() { @@ -258,7 +275,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } }); } - + public R write(String key, SyncInterruptedOperation operation) throws InterruptedException { int slot = calcSlot(key); return write(slot, operation, 0); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 57ca25f9b..b634cf18e 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -15,8 +15,6 @@ */ package org.redisson.connection; -import io.netty.channel.EventLoopGroup; - import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -47,11 +45,11 @@ public class MasterSlaveEntry { final MasterSlaveServersConfig config; final RedisCodec codec; - final EventLoopGroup group; + final ConnectionManager connectionManager; - public MasterSlaveEntry(RedisCodec codec, EventLoopGroup group, MasterSlaveServersConfig config) { + public MasterSlaveEntry(RedisCodec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) { this.codec = codec; - this.group = group; + this.connectionManager = connectionManager; this.config = config; slaveBalancer = config.getLoadBalancer(); @@ -60,7 +58,7 @@ public class MasterSlaveEntry { List addresses = new ArrayList(config.getSlaveAddresses()); addresses.add(config.getMasterAddress()); for (URI address : addresses) { - RedisClient client = new RedisClient(group, address.getHost(), address.getPort(), config.getTimeout()); + RedisClient client = connectionManager.createClient(address.getHost(), address.getPort()); SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, config.getSlaveConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize()); @@ -74,7 +72,7 @@ public class MasterSlaveEntry { } public void setupMasterEntry(String host, int port) { - RedisClient masterClient = new RedisClient(group, host, port, config.getTimeout()); + RedisClient masterClient = connectionManager.createClient(host, port); masterEntry = new ConnectionEntry(masterClient, config.getMasterConnectionPoolSize()); } @@ -85,7 +83,7 @@ public class MasterSlaveEntry { public void addSlave(String host, int port) { slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); - RedisClient client = new RedisClient(group, host, port, config.getTimeout()); + RedisClient client = connectionManager.createClient(host, port); slaveBalancer.add(new SubscribesConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), this.config.getSlaveSubscriptionConnectionPoolSize())); diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 34889ad56..7b41bb716 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -58,7 +58,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final Set addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); for (URI addr : cfg.getSentinelAddresses()) { - RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort()); RedisAsyncConnection connection = client.connectAsync(); // TODO async @@ -93,7 +93,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final Set freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); for (final URI addr : cfg.getSentinelAddresses()) { - RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout()); + RedisClient client = createClient(addr.getHost(), addr.getPort()); sentinels.add(client); RedisPubSubConnection pubsub = client.connectPubSub(); diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 80a0dd664..3aebc9d4c 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -41,7 +41,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { protected void init(MasterSlaveServersConfig config) { this.config = config; - SingleEntry entry = new SingleEntry(codec, group, config); + SingleEntry entry = new SingleEntry(codec, this, config); entries.put(Integer.MAX_VALUE, entry); } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 35e6d324d..fd7ca90f0 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -15,8 +15,6 @@ */ package org.redisson.connection; -import io.netty.channel.EventLoopGroup; - import org.redisson.MasterSlaveServersConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,13 +29,13 @@ public class SingleEntry extends MasterSlaveEntry { private final Logger log = LoggerFactory.getLogger(getClass()); - public SingleEntry(RedisCodec codec, EventLoopGroup group, MasterSlaveServersConfig config) { - super(codec, group, config); + public SingleEntry(RedisCodec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + super(codec, connectionManager, config); } @Override public void setupMasterEntry(String host, int port) { - RedisClient masterClient = new RedisClient(group, host, port, config.getTimeout()); + RedisClient masterClient = connectionManager.createClient(host, port); masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize()); }